This article is participating in “Java Theme Month – Java Development in Action”, see the activity link for details
This is the first day of my participation in Gwen Challenge
Kafka is a Producer/Consumer code that can be developed using kafkaTemplate (Producer template) + @kafkalistener (Consumer listener). I won’t talk too much about Spring-Kafka here. Today we’re going to focus on how to increase kafka’s spending power.
1. Simple consumers
1.1 configuration consumerFactory
The first step is to configure the consumer properties
@Bean(BeanNameConstant.CONSUMER_FACTORY)
public ConsumerFactory<String, String> consumerFactory(a) {
final StringDeserializer stringDeserializer = new StringDeserializer();
Map<String, Object> props = new HashMap<>(10);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
// Set whether to automatically commit offset 2.3. The default is false
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 300000);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, autoOffsetReset);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
ConsumerFactory<String, String> consumerFactory = new DefaultKafkaConsumerFactory(props, stringDeserializer, stringDeserializer);
return consumerFactory;
}
Copy the code
1.2 configuration KafkaListenerContainerFactory
In ConcurrentKafkaListenerContainerFactory this was about the main consumer packaging, thread itself KafkaConsumer is unsafe, unable to concurrent operation, the spring here again in packing layer, According to the configuration of the spring. Kafka. Listener. Concurrency to generate multiple concurrent KafkaMessageListenerContainer instance
@Bean(BeanNameConstant.KAFKA_LISTENER_CONTAINER_FACTORY)
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory
(@Qualifier(BeanNameConstant.CONSUMER_FACTORY) ConsumerFactory<String, String> consumerFactory) {
This class creates the topic consumption listener by topic name
ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory =
new ConcurrentKafkaListenerContainerFactory<>();
// This can be set via annotations
concurrentKafkaListenerContainerFactory.setConsumerFactory(consumerFactory);
/ / ack manually
concurrentKafkaListenerContainerFactory.getContainerProperties().setAckOnError(false);
// Set ack model mechanism When error occurs, different processing mechanisms have different processing mechanisms for offset and errorconcurrentKafkaListenerContainerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE );return concurrentKafkaListenerContainerFactory;
}
Copy the code
1.3 consumers
@KafkaListener( topics = "${kafka-topic.demo}", containerFactory = BeanNameConstant.KAFKA_LISTENER_CONTAINER_FACTORY, concurrency = "1" )
public void loadListener(ConsumerRecord
record, Acknowledgment ack){
try{
// Business method
dealMessage(JsonUtil.readValue(String.valueOf(record.value()),Demo.class));
}catch (Exception e){
log.error("Failure to consume");
}finally {
// Manually submit the ACKack.acknowledge(); }}Copy the code
The ConsumerRecord class contains partition information, message headers, message bodies, etc. ConsumerRecord is a good choice if your business needs to retrieve these parameters. It is more convenient to receive the message body using a concrete type, such as a String. Usually we use ConsumerRecord for consumption.
After the rebalance, the consumer will pull the offset from the partition again. There may be repeated consumption.
2. Multiple consumers
Above you can see has a property in @ KafkaListener concurrency ConcurrentKafkaListenerContainerFactory member variables, it is We can in the configuration in KafkaListenerContainerFactory Settings, also can be in each KafkaListener to cover the configuration.
/**
* Specify the container concurrency.
* @param concurrency the number of consumers to create.
* @see ConcurrentMessageListenerContainer#setConcurrency(int)
*/
public void setConcurrency(Integer concurrency) {
this.concurrency = concurrency;
}
Copy the code
Its role is to create n KafkaMessageListenerContainer instance, namely n kafkaconumser. Is the realization of multiple consumer consumption key.
@KafkaListener(
topics = "${kafka-topic.demo}",
containerFactory = BeanNameConstant.KAFKA_LISTENER_CONTAINER_FACTORY,
concurrency = "12"
)
Copy the code
So let’s just change the concurrency number to allow multithreaded consumption? That simple? The answer is no.
2.1 Concurrency Setting
Concurrency’s setting depends on the size of kafka’s partition data. Because a KafkaMessageListenerContainer will only one partition for consumption.
If your topic has only 8 partitions, your concurrency should only have a maximum of 8 working. Note that for distributed systems, there is also * number of nodes. If you have two nodes, the concurrency for each should be set to 4
3. Multithreading bulk consumption
With the Concurrency setup, we’ve really implemented multi-threaded consumption faster than ever before. If we want to go faster, we can’t increase the number of partitions endlessly. Partition can’t shrink down if you expand it up. So find a different way.
The official batch interface looks like this. That is, we used to receive messages using the ConsumerRecord class. Now we use the List
.
public interface BatchMessageListener<K.V> {
void onMessage(List<ConsumerRecord<K, V>> data);
}
public interface BatchAcknowledgingMessageListener<K.V> {
void onMessage(List<ConsumerRecord<K, V>> data, Acknowledgment acknowledgment);
}
Copy the code
Before that, we need to adjust the configuration of the Consumer
3.1 Configuration Adjustment
3.1.1 configuration consumerFactory
Put (consumerconfig. MAX_POLL_RECORDS_CONFIG,10000); // Maximum number of props 2000 Maximum number of props 1200 props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,1200000);Copy the code
-
Max.poll. records controls the number of strips pulled at a time.
-
Max.poll.interval. ms Indicates the maximum interval between each poll.
The parameter max.poll.interval.ms is important for bulk consumption. If the setting is too short, the consumer will poll a batch of data again before submitting the offset, triggering the rebalance on Conusmer. As a result, information already consumed is allocated to other consumers to consume again. And then you go into a loop. All consumers have been consuming this offset of data. The result is data compression and repeated consumption. As far as my practice is concerned, setting a very large value makes no difference. Max.poll.interval. ms indicates that the consumer can poll data after the Max poll.interval.
3.1.2 configuration KafkaListenerContainerFactory
This class creates the topic consumption listener by topic name
ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory =
newConcurrentKafkaListenerContainerFactory<>(); ...// Whether concurrent consumption is required
concurrentKafkaListenerContainerFactory.setBatchListener(true);
Copy the code
3.2 Code Implementation
// The maximum number of processes per thread
private static final int MAX_NUM = 100;
@KafkaListener(groupId = "${spring.kafka.consumer.group-id}", topics = "${kafka-topic}",concurrency="3")
public void loadListener(List
> record, Acknowledgment ack)
> {
final long startTime = System.currentTimeMillis();
int batchSize = record.size() / MAX_NUM;
// The consumer is backlogged with a large number of messages each time it consumes, so commit offset to ensure that all asynchronous threads finish processing
if (batchSize == 0) {
// Message is less than 100
this.dealMessage(record);
} else {
LinkedList<Future> futures = new LinkedList<>();
for (int i = 0; i < batchSize; i++) { List<ConsumerRecord<? ,? >> records;if (i == batchSize - 1) {
// We need to add the remainder
records = record.subList(i * MAX_NUM, record.size());
} else {
records = record.subList(i * MAX_NUM, (i + 1) * MAX_NUM);
}
finalFuture<? > submit = executorService.submit(() -> {this.dealMessage(records);
});
futures.add(submit);
}
// Wait for all threads to finish executing
for (Future future : futures) {
try {
future.get();
} catch (InterruptedException e) {
// ignore
} catch (ExecutionException e) {
// ignore
}
}
}
log.info("Batch processing completed, processing quantity ={}, time ={}ms", record.size(), System.currentTimeMillis() - startTime);
// Manually determine to submit the offset
ack.acknowledge();
}
/** * batch processing *@param record
*/
public void dealMessage(List
> record)
> {
for(ConsumerRecord<? ,? > consumerRecord : record) {// Business logic
this.dealMessage(consumerRecord); }}Copy the code
My code idea is to pull messages down in batches and consume them in multiple threads. If we are concerned about data security and accuracy, we can submit offset after all this batch of data is processed. If you don’t care, you can also throw it into the asynchronous thread pool for processing, and the offset is submitted directly.
public void loadListener(List
> record, Acknowledgment ack)
> {
final long startTime = System.currentTimeMillis();
// Don't worry about message loss
for(ConsumerRecord<? ,? > consumerRecord : record) { executorService.execute(()->{ dealMessage(consumerRecord); }); } log.info("Batch processing completed, processing quantity ={}, time ={}ms", record.size(), System.currentTimeMillis() - startTime);
// Manually determine to submit the offset
ack.acknowledge();
}
Copy the code
Either way, the performance gains will be significant. However, it is important to note that the number of pulls per pull and the setting of threads depend on the actual situation. During the test, adjust the CPU/memory and DB monitoring status in time. Don’t kill the rest of the business by blindly pursuing data processing power. The premise of performance optimization must be to ensure the stability of the system. Good luck to you