preface

Kafka is a very popular distributed message queue, often used for asynchronous communication between microservices, business decoupling and other scenarios. Kafka’s performance is very powerful, but the throughput of a single microservice is capped, we will use distributed microservices, multi-consumer and multi-producer data processing, to ensure that performance can also be based on the volume of business horizontal expansion, for the same microservice for multiple instances, the input and output topic is the same, In this case, we can use Kafka partition consumption to solve this problem.

The business scenario

We develop is a iot system, a large number of devices to send data, real-time access to the platform has a second level data and minute level data, and so on, processing process includes access, processing, storage, kafka these three modules are used for data transfer, data processing module contains multiple services, will experience many individual data processing, part of the business takes longer, As a result, individual services cannot achieve throughput balance when receiving data at a high frequency, so these services are distributed and multiple instances are consumed.

Business implementation

Not specifying partitions

When we send a message to Kafka, we do not need to manually create a topic if the partition is not specified. When we send a message to Kafka, we will automatically create a topic with partition 1, as follows:

@Service
public class ProductService {

    @Autowired
    private KafkaTemplate kafkaTemplate;
    
    public void send(String msg, String topic) { kafkaTemplate.send(topic, msg); }}Copy the code

The specified partition

Initialize and configure a topic partition

When specifying a partition, if the number of topic partitions is not configured and a partition >0 is specified, a message will be displayed indicating that the partition does not exist. In this case, we need to create a topic and partition in advance

Manually create a topic x using kafka Tool is not recommended before service startup

Auto-create, when the service is started, use KafkaClient to create recommended √

/** * Initializes a multipartition topic based on springBoot2 */ @Component public void TopicInitRunner implements ApplicationRunner {@autoWired private AdminClient adminClient; @override public void run(ApplicationArguments args) throws Exception {// Override public void run(ApplicationArguments args) throws Exception { // Key topic V Number of partitions Map<String, Integer> topicPartitionMap = new HashMap<>(); for (Map.Entry<String, Integer> e : topicPartitionMap.entrySet()) { createTopic(e.getKey(), e.getValue()); } } public void createTopic(String topic, int partition) { NewTopic newTopic = new NewTopic(topic, partition); adminClient.createTopics(Lists.newArrayList(newTopic)); }} public class KafkaConfig {public class KafkaConfig {public class KafkaConfig {public class KafkaConfig {public class KafkaConfig} @Value("${spring.kafka.bootstrap-servers}") private String servers; @Bean public AdminClient adminClient() { return AdminClient.create(kafkaAdmin().getConfig()); } @Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> props = Maps.newHashMap(); props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, servers); return new KafkaAdmin(props); }}Copy the code

Producer partition sending scheme

We will use TopicInitRunner to create a partition named partition-topic with a number of three. Now we will talk about how to evenly send messages to each partition. How to ensure that multiple consumer instances are load balanced is as follows:

  • 1. Since each message is uploaded by a device, it will have a device ID. First, generate a self-increasing sign for each device
  • 2. Use the increment sign to take the modulus operation of the partition number. The code is as follows:
public class ProductService {
    /** * data indicates the data to be sent */
    public void partitionSend(String topic, int partition, JSONObject data) {
         // Obtain the device ID
        String deviceId = data.getString("deviceId");
        // Get the increment if it is a new device, create one and put it in the cache
        int inc = getDeviceInc(deviceId);
        // If the number of partitions is 3 and the device increment id is 1, the modulus is 1 and the data is sent to partition 1. Therefore, 1000 devices can guarantee that each partition sends 1000/3 data
        int targetPartition = Math.floorMod(inc, partition);
        // When sending a partition, you need to specify a unique K. You can use the UUID or Baidu snowflake algorithm to obtain the ID stringkafkaTemplate.send(topic, partition, getUuid(), data.toJSONString()); }}Copy the code

consumers

We talked about consumer distributed deployment, where there are multiple instances of a microservice. We just need to create the number of service instances based on the number of topic partitions that the service listens to, and kafka will automatically allocate data to each instance.

We take batch consumption, to further improve service throughput performance, consumption and configuration code is as follows, the configuration file reference SpringBootKafka configuration, the main design Kafka service configuration, consumption and production configuration, the core is

@Component
public class DataListener {

    @Autowired
    private MongoTemplate mongoTemplate;

    /** * Site packet listening consumption **@param records
     */
    @KafkaListener(topics = "partition-topic", containerFactory = "batchConsumerFactory")
    public void iotSiteHistoryMessageConsumer(List<ConsumerRecord<String, String>> records) {}/** * Consumer configuration */
    @Bean
    public Map<String, Object> consumerConfigs(a) {
        Map<String, Object> props = Maps.newHashMap();
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }
    
    /** * Batch consumption configuration */
    @Bean
    public KafkaListenerContainerFactory batchConsumerFactory(a) {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerConfigs()));
        factory.setBatchListener(true);
        returnfactory; }}Copy the code