This is the 21st day of my participation in the August More Text Challenge

preface

KafkaConsumer can be used to subscribe to topics and pull messages from the subscribed topics. In Kafka, there are more groups of consumers than producers, also known as consumer groups, to speed up the consumption of a single machine. This article introduces the concept of consumers and consumer groups, and then goes into more detail about client development.

I. Consumers and consumer groups

Consumers subscribe the theme of Kafka, and pull information from above, but unlike producers is that it increases the concept of consumer groups, this is because most of the time of Kafka consumers in the consumer information often do some high delay action, such as data is written to the database, it reads the data processing, etc., This is much slower than producer, so the increase of the consumer group is used to increase Kafka’s consumption power. When a message on the same topic comes in again, the message is shared by the same consumer group.

1.1 Graphic consumer model

Now, let’s take a look at the process of consumption.

Case 1:

For example, there is a printing service in the company, assuming that there are 6 printing partitions, corresponding to color printing Word, color printing Excel, color printing PPT, black and white Word, black and white Excel, black and white PPT six partition content, at this time only one printer. The following

Situation 2:

But the results of the printer were so slow that many people queued up in the printing room all day long to print documents. At this point, the company buys a new printer and has them handle print requests separately, placing them in a group of consumers and consuming data from those partitions at the same time.

At this point, they process the data allocated to the partition separately, without logically interfering with each other. Messages in the same topic are published to only one consumer in the consumer group.

Situation 3:

At this point, the company wanted to add a print backup function, so it purchased a printer to print all the printed files simultaneously. (PS: I don’t know what kind of weird company this is, just for the sake of the scene ~)

As follows:

Data from each partition is then sent to consumer group B, meaning messages from the same partition can be consumed by consumers from different consumer groups.

Situation 4:

At this point, in order to prepare for the financing, the company flexed its muscles to investors, so it invited the purchase of five more printers, this time the scene is as follows:

Although the consumer and consumer group model allows for horizontal scalability of overall consumption power, adding consumers does not necessarily improve consumption power in the case of fixed partitions, as shown in the figure, where a printer cannot be allocated to partitions and cannot consume data.

1.2 Message delivery mode

The two modes of message queuing mentioned earlier are point-to-point and publish-subscribe. Kafka supports both modes. This is the key understanding.

  • The point-to-point mode is based on queues, which is similar to the data in the same consumer group. The producer sends data to the partition, and then the consumer pulls the message from the partition for consumption. At this time, the message can only be consumed once by the consumers in the same consumer group.
  • The publish-subscribe pattern is that partitioned messages in Kafka can be consumed by different consumer groups. This is the application of one-to-many broadcast mode.

Of course, the consumer group is a logical concept, configured with the client parameter group.id, which defaults to an empty string. The consumer is not a logical concept. It is an entity that actually consumes data, be it a thread or a machine.

Ok, now that we understand the concept of consumers and consumer groups, let’s open the Pandora’s box of the consumer client.

Second, Kafka consumer applications

Again, the consumer is dependent on Kafak’s client, and the normal consumption logic is as follows:

  • 1. Configure consumer client parameters and create corresponding consumer instances
  • 2. Subscribe to topics
  • 3. Pull messages and consume them
  • 4. Submit the consumption shift
  • 5. Close the consumer instance

So this shift right here we don’t know exactly what it means, but don’t worry, we’ll talk about it later, but let’s look at the next typical consumer and what it’s going to look like.

2.1 Consumer client demo

public class Consumer { public static void main(String[] args) { Properties props = new Properties(); Props. The put (" the bootstrap. The servers, "" 192.168.81.101:9092"); props.put("group.id", "test"); / / consumer groups props. The put (" key. The deserializer ", "org.apache.kafka.com mon. Serialization. StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("xiaolei2")); while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }}}}Copy the code

2.2 Configuring Required Parameters

When creating a consumer, Kafka has four mandatory parameters, one more than the producer.

  • Bootstrap. servers: This parameter specifies the list of broker addresses to connect to the Kafka cluster. It can be either a single address or a comma separated Kafka cluster address.

  • Deserializer for key and Value. Deserializer: The key and value are serialized to generate byte arrays during message sending. Therefore, data needs to be deserialized to the original data when consuming data.

  • Group. id: the name of the group to which the consumer belongs. The default value is “”. If set to null, an exception will be thrown

    Exception in thread "main" org.apache.kafka.common.errors.InvalidGroupIdException: To use the group management or offset commit APIs, you must provide a valid group.id in the consumer configuration.
    Copy the code

2.3 Subscribing to Topics and Partitions

Once a consumer is created, we need to subscribe to related topics for it. A consumer can subscribe to one or more topics. There are two apis you can use here

  • Consumer.subscribe (Collection topics) : indicates a Collection of topics to subscribe to;
  • Consumer.subscribe (Pattern Pattern) : Uses the re to match the collection to subscribe to.

It is also easy to understand that Kafka can use regular expressions to match related topics, such as the following:

consumer.subscribe(Pattern.compile("topic-.*"));
Copy the code

However, if the definition of consumer is repeated, it will be the same as the following. The following subscription is xiaolei3.

consumer.subscribe(Arrays.asList("xiaolei2"));
consumer.subscribe(Arrays.asList("xiaolei3"));
Copy the code

After subscribing to the topic, let’s talk about how it defines partitions.

Subscribe directly to specific partitions.

 consumer.assign(Arrays.asList(new TopicPartition("xiaolei2",0)));
Copy the code

We’re using our assing approach to subscribe to a specific partition. So what if you don’t know what the partitions are?

You can use KafkaConsumer’s partitionsFor() method to query metadata information for a given topic.

The following implementation:

        consumer.assign(Arrays.asList(new TopicPartition("xiaolei2",0)));
        ArrayList<TopicPartition> topicPartitions = new ArrayList<>();
        List<PartitionInfo> partitionInfos = consumer.partitionsFor("xiaolei2");
        for (PartitionInfo partitionInfo : partitionInfos) {
            topicPartitions.add(new TopicPartition(partitionInfo.topic(),partitionInfo.partition()));
        }
        consumer.assign(topicPartitions);
Copy the code

Finally, consumption in Kafka is pull-based, and messages are consumed in two ways,

  • One is push: the server actively sends messages to consumers, such as the sending of articles on wechat public accounts
  • One is poll: a consumer initiates an active request to the server for a fetch.

Kafka only requires the polling API to periodically request data from the server, and once the consumer subscribes to the topic, polling takes care of all the details, such as sending heartbeats, fetching data, partitioning rebalancing, and so on. We take care of business.

3. Consumption displacement

3.1 What is an offset

For Kafka’s partitions, each message has a unique offset, a monotonically increasing integer, that shows the corresponding position of the message in the partition. After version 0.9 Kafka offsets are stored in the _consumer_offsets theme of Kafka. The consumer submits a consumption shift to this topic after consuming the message. Consumers start consuming messages from the new consumption shift when they restart.

Because the shift commit is performed after all the pulled messages are consumed, data loss or repeated consumption can occur if the offset is not committed correctly.

  • If an exception occurs while consuming x+2, a failure occurs, and after the failure is recovered, the re-pull message still starts at X, then the data from x to X +2 will be consumed again.
  • If the offset is committed before the consumption reaches x+2 and the message is not consumed, then the fault occurs and the consumption starts from the new offset x+5 after the restart, then the message between x+2 and x+5 will be lost.

Therefore, it is important to know when the commit offset is significant. In Kafka, the commit offset is either manual or automatic.

3.2 Automatic submission of offsets

The default commit mode for consumption shifts in Kafka is automatic commit. This is configured in the consumer client parameter enable.auto.mit, which defaults to true. It is the maximum message offset that a poll pull down is periodically submitted to _comsumer_offsets. The periodic time is set on auto.mit.interval. ms. The default time is 5s.

While auto-commit is a very convenient way to simplify coding, auto-commit is problematic, as we mentioned above, for data loss and repeated consumption. Therefore, Kafka provides manual submission of the amount of shift, more flexible processing of consumption shift.

3.3 Manually Submitting an Offset

To enable manual submission, disable automatic submission and set enable.auto.mit to false.

Depending on user needs, this offset value can be divided into two types:

  • Normally, manually commit the maximum offset to the pull.
  • Manually submit an offset of fixed value.

There are two methods to manually commit offset: commitSync and commitAsync. The similarity between them is that the highest offset of a batch of data of this poll will be submitted. The difference is that commitSync blocks the current thread until a commit is successful, and automatically retries the failure (commit failures can also occur due to factors beyond your control). CommitAsync does not have a retry mechanism, so the commit may fail.

3.3.1 Synchronizing the Offset submission

Synchronous submission of offsets is more reliable because it has a retry mechanism.

public class CustomComsumer { public static void main(String[] args) { Properties props = new Properties(); //Kafka cluster props. Put ("bootstrap. Servers ", "hadoop102:9092"); // props. Put ("group.id", "test"); // props. props.put("enable.auto.commit", "false"); / / close automatically submitted to offset props. The put (" key. The deserializer ", "org.apache.kafka.com mon. Serialization. StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("first")); // Consumer subscription theme while (true) {ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); Consumer.com mitSync(); }}}Copy the code
3.3.2 Submitting offset asynchronously

It is more reliable to commit offset synchronously, but because it blocks the current thread until the commit succeeds. Throughput is therefore significantly affected. Therefore, in more cases, asynchronous submission of offset is used.

The following is an example of an asynchronous submission of offset:

public class CustomConsumer { public static void main(String[] args) { Properties props = new Properties(); //Kafka cluster props. Put ("bootstrap. Servers ", "hadoop102:9092"); // props. Put ("group.id", "test"); // props. // Disable the automatic submission of offset props. Put (" enable.auto.mit ", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("first")); // Consumer subscription theme while (true) {ConsumerRecords<String, String> records = consumer.poll(100); For (ConsumerRecord<String, String> record: records) { System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); @override public void onComplete(Map<TopicPartition, OffsetAndMetadata> offsets, Exception exception) { if (exception ! = null) { System.err.println("Commit failed for" + offsets); }}}); }}}Copy the code

Asynchronous commits improve the throughput of your program because you can request data without waiting for a response.

Asynchronous commits also fail. Suppose the first commits a displacement of 100, but fails, and the second commits a displacement of 200, what will happen?

If you try again and commit the 100 shift again, this time the commit succeeds, the 200 shift will be overwritten and become 100. So you’re going to have duplication of spending, and you’re going to start spending at 100.

Therefore, for this reason, a synchronous + asynchronous combination can be used, in which a 200 shift can only be committed after a 100 commit until the request is successful.

3.3.3 Synchronous and Asynchronous Submission

Asynchronous commits are used in normal polling to ensure throughput, but synchronous commits are used to ensure the final commit is successful before the final consumer is shut down or after an exception occurs. This is a check done at the end.

Try {while (true) {// Pull message logic processing // async (); } } catch (Exception e) { e.printStackTrace(); Consumer.com mitSync(); } finally { consumer.close(); }}Copy the code

3.4 Specify displacement consumption

Because of the existence of consumption displacement, we can find the storage displacement position and start consumption when consumers shut down, restart and rebalance, but consumption displacement does not exist at the beginning, such as the following situations:

  • 1. When a new consumer group is established
  • 2. A consumer in the consumer group subscribes to a new topic;
  • 3. _comsumer_offsets Subject displacement information expired was removed

In these cases Kafka cannot find the consumption shift, so it determines where to start consumption based on the configuration of the client parameter auto-.offset. reset, which defaults to latest.

  • Earliest: When there is a submitted offset under each partition, the money will be consumed from the submitted offset; If there is no offset submitted, the consumption starts from scratch.
  • Latest: Consumes the submitted offset when each partition has an offset. If there is no committed offset, the newly generated data under the partition is consumed (default).
  • None: If each partition has a committed offset, the consumption starts after the offset. As long as there is a partition does not exist submitted offset, directly thrown NoOffsetForPartitionException exception;

Kafka’s auto-.offset. reset argument only allows us to consume from the beginning or the end of a coarse-grained pull message. It does not specify an exact shift to start the pull message. This provides a great deal of flexibility in consuming messages. Seek () can also store message shifts in external storage via storeOffsetToDB, and can be used with rebalancing listeners to provide more precise consumption power.

3.4.1 seek specifies displacement consumption

The seek method is defined as follows:

public void seek(TopicPartition partition, long offset)
Copy the code
  • Partition indicates a partition.
  • Offset indicates where the consumption starts in the partition
afkaConsumer<String, String> consumer = new KafkaConsumer<>(props); consumer.subscribe(Arrays.asList("xiaolei2")); consumer.poll(Duration.ofMillis(10000)); Set<TopicPartition> assignment = consumer.assignment(); for (TopicPartition tp : assignment) { consumer.seek(tp,100); } while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }}Copy the code

The seek() method can only reset the consumption position of the partition to which the consumer is assigned, and the allocation of the partition is implemented during the call of the poll() method. That is, the poll() method needs to be executed before the seek() method is executed and the consumption position can be reset after the partition is allocated.

Therefore, set a time in the poll () method to wait for the partition to complete, and then get the partition information for data consumption through the Assignment () method.

If set to 0 in the poll () method, the partition will not be retrieved. If this time is too long, it will also cause unnecessary waiting. Let’s take a look at the optimal solution.

3.4.2 seek specifies displacement consumption optimization
consumer.subscribe(Arrays.asList("xiaolei2")); Set<TopicPartition> assignment = new HashSet<>(); while (assignment.size()==0){ consumer.poll(Duration.ofMillis(100)); assignment=consumer.assignment(); } for (TopicPartition tp : assignment) { consumer.seek(tp,100); } while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records){ System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value()); }}Copy the code
3.4.3 Seek consumes from the beginning or end of the partition

If the consumer in the consumer group can find the consumption shift at startup, the auto-.offset. reset parameter will not work unless the shift is out of bounds. If you want to specify the start or end of consumption, you also need seek().

To consume at the specified displacement, you need to obtain the offset at the beginning or end of each partition. The beginningOffsets() and endOffsets() methods can be used.

Set<TopicPartition> assignment = new HashSet<>(); // Performs partition allocation logic inside the poll() method, which loops to ensure that the partition has been allocated. // Enter this loop when the partition message is 0. If it is not 0, the partition has been successfully allocated. while (assignment.size() == 0) { consumer.poll(100); The assignment() method is used to get the value of the partition message assigned by the consumer: topic-demo-3, topic-demo-0, topic-demo-2, topic-demo-1 assignment = consumer.assignment(); } / / specified partition Map from consumption < TopicPartition, Long > beginOffsets = consumer. BeginningOffsets (the assignment). for (TopicPartition tp : assignment) { Long offset = beginOffsets.get(tp); System.out.println(" partition "+ tp +" from "+ offset + "); consumer.seek(tp, offset); } <TopicPartition, Long> endOffsets = consumer.endoffsets (assignment); for (TopicPartition tp : assignment) { Long offset = endOffsets.get(tp); System.out.println(" partition "+ tp +" from "+ offset + "); consumer.seek(tp, offset); } // Execute the poll() method again to consume the pulled data. / /... (omitted)Copy the code

SeekToBeginning () and seekToEnd() are provided directly in KafkaConsumer. Specific definitions are as follows:

public void seekToBeginning(Collection<TopicPartition> partitions)
public void seekToEnd(Collection<TopicPartition> partitions) 
Copy the code

The alternative code is as follows:

Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(assignment); for (TopicPartition tp : assignment) { Long offset = beginOffsets.get(tp); System.out.println(" partition "+ tp +" from "+ offset + "); consumer.seek(tp, offset); }Copy the code
3.4.5 Consumption by timestamp

For example, if we want to consume messages from the day before yesterday at this time, we can’t trace them directly to this location, so we can use KafkaConsumer’s offsetsForTimes method

public Map<TopicPartition, OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
Copy the code

The offsetsForTimes() parameter timestampsToSearch is a Map, where key is the partition to be queried, value is the timestamp to be queried, This method returns offset and timestamp corresponding to the first message whose timestamp is greater than or equal to the query time.

For example, consume messages from one day after the current time as follows:

Set<TopicPartition> assignment = new HashSet<>(); while (assignment.size() == 0) { consumer.poll(100); assignment = consumer.assignment(); } Map<TopicPartition, Long> timestampToSearch = new HashMap<>(); for (TopicPartition tp : Timestamptosearch.put (tp, system.currentTimemillis () -24 * 3600 * 1000); } Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timestampToSearch); for(TopicPartition tp: assignment){ OffsetAndTimestamp offsetAndTimestamp = offsets.get(tp); If (offsetAndTimestamp! = null) { consumer.seek(tp, offsetAndTimestamp.offset()); } } while (true) { ConsumerRecords<String, String> records = consumer.poll(100); For (ConsumerRecord<String, String> record: records) { System.out.println(record.offset() + ":" + record.value() + ":" + record.partition() + ":" + record.timestamp()); }}Copy the code

Fourth, control or close consumption

KafkaConsumer provides a way to control the consumption rate. At some point, we may close or pause a consumption from one partition and resume consumption from another partition when certain conditions are reached. These methods are pause() and resume().

public void pause(Collection<TopicPartition> partitions) { this.acquireAndEnsureOpen(); try { this.log.debug("Pausing partitions {}", partitions); Iterator var2 = partitions.iterator(); while(var2.hasNext()) { TopicPartition partition = (TopicPartition)var2.next(); this.subscriptions.pause(partition); } } finally { this.release(); }}Copy the code
public void resume(Collection<TopicPartition> partitions) { this.acquireAndEnsureOpen(); try { this.log.debug("Resuming partitions {}", partitions); Iterator var2 = partitions.iterator(); while(var2.hasNext()) { TopicPartition partition = (TopicPartition)var2.next(); this.subscriptions.resume(partition); } } finally { this.release(); }}Copy the code

In addition to pausing and resuming, Kafka also provides a lunch paused() method to return a paused collection of partitions.

public Set<TopicPartition> paused()
Copy the code

Rebalance

Rebalancing refers to the transfer of ownership of a partition from one consumer to another. For example, when new consumers are added, rebalancing can result in a redistribution of partitions and consumers, providing high availability and scalability for consumer groups.

When rebalancing occurs, consumers in the consumer group are not able to read the message, that is, consumers become unavailable during the short period during which rebalancing occurs. In addition, rebalancing can cause message duplication, because when a partition is assigned to another consumer, the state of the consumer will be lost, and the new consumer will start consuming from the original shift before the consumption shift can be synchronized. Therefore, rebalancing should be avoided as much as possible.

We can pass in a custom partition rebalancing listener using the overload method subscribe

/* Subscribe (Collection<String> topics, ConsumerRebalanceListener listener) / * need to subscribe to the theme of using regular matching * / subscribe (the Pattern the Pattern, ConsumerRebalanceListener listener)Copy the code

The code is as follows:

Map<TopicPartition, OffsetAndMetadata> currentOffsets = new HashMap<>(); Consumer. The subscribe (Collections. SingletonList (topic), new ConsumerRebalanceListener () {/ * this method will after consumers stop reading the news, */ @override public void onPartitionsRevoked(Collection<TopicPartition> partitions) { System.out.println(" rebalancing is about to trigger "); Consumer.mitsync (currentOffsets); Currentoffsets.clear (); } /* This method will redistribute the partition, */ @override public void onPartitionsAssigned(Collection<TopicPartition> partitions) {}}); try { while (true) { ConsumerRecords<String, String> records = consumer.poll(Duration.of(100, ChronoUnit.MILLIS)); for (ConsumerRecord<String, String> record : records) { System.out.println(record); TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition()); OffsetAndMetadata offsetAndMetadata = new OffsetAndMetadata(record.offset() + 1, "no metaData"); //TopicPartition overwrites hashCode and equals methods, CurrentOffsets. Put (topicPartition, offsetAndMetadata); } consumer.commitAsync(currentOffsets, null); } } finally { consumer.close(); }Copy the code

Message shifts are temporarily stored in the local variable currentOffsets in the code, which can be submitted asynchronously during normal consumption, but the onPartitionsRevoked callback function is submitted synchronously before the rebalancing action occurs to avoid repeated consumption of rebalancing.

6. Interceptor

Like the producer client interceptor mechanism, the consumer client defines the interceptor logic. The custom interceptor logic is implemented by implementing ConsumerInterceptor, which has three main methods:

  • Public ConsumerRecords

    onConsume(ConsumerRecords

    records) The consumer calls this method before the poll method returns, To customize messages, such as modifying message content and filtering messages according to certain rules.
    ,>
    ,>
  • Public void onCommit(Map

    offsets) The consumer calls this method after the commit shift.
    ,>
  • Public void close() : Close
public class ConsumerInterceptorPrefix implements ConsumerInterceptor<String,String> { @Override public ConsumerRecords<String,String> onConsume(ConsumerRecords<String,String> consumerRecords) { Map<TopicPartition, List<ConsumerRecord<String, String>>> newRecords = new HashMap<>(); for (TopicPartition partition : consumerRecords.partitions()) { List<ConsumerRecord<String, String>> recs = consumerRecords.records(partition); List<ConsumerRecord<String, String>> newRecs = new ArrayList<>(); for(ConsumerRecord<String,String> rec:recs){ String newValue = "xiaolei-"+rec.value(); ConsumerRecord<String,String> newRec = new ConsumerRecord<>(rec.topic(), rec.partition(),rec.offset(),rec.key(),newValue); newRecs.add(newRec); } newRecords.put(partition,newRecs); } return new ConsumerRecords<>(newRecords); } @Override public void close() { } @Override public void onCommit(Map<TopicPartition, OffsetAndMetadata> map) { map.forEach((tp,offsetAndMetadata) -> { System.out.println(tp+" : "+offsetAndMetadata.offset()); }); } @Override public void configure(Map<String, ? > map) { } }Copy the code

Add interceptors to the configuration class

props.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, ConsumerInterceptorPrefix.class.getName());
Copy the code

7. Important consumer parameters

7.1 the fetch. Min. Bytes

This property specifies the minimum number of bytes for the consumer to retrieve records from the server. If the amount of data available is less than the size specified in fetch.min.bytes when the broker receives a data request from a consumer, it will wait until there is enough data available before returning it to the consumer. This reduces the workload for consumers and brokers because they do not need to process messages back and forth when the topic is not very active. If you don’t have a lot of data available, but the consumer has high CPU usage, you can set the value of this property to a larger value than the default. If the number of consumers is large, setting the value of this attribute to a larger value can reduce the broker’s workload.

7.2 the fetch. Max. Wait. Ms

This property specifies how long the broker should wait for messages to return. The default is 500ms. If there is not enough data flowing into Kafka, the minimum amount of data required by consumers is not met, resulting in a 500ms delay. If you want to reduce the potential latency (to meet the SLA), you can set this parameter to a smaller value. If fetch.max.wait.ms is set to 100ms and fetch.min.bytes is set to 1MB, Kafka will return either 1MB of data or available data after 100ms upon receiving a consumer request, as soon as either condition is met.

7.3 Max. Partition. The fetch bytes

This property specifies the maximum number of bytes the server can return to the consumer from each partition. Its default value is 1MB. KafkaConsumer. The poll () method returns the records from each partition is no more than Max. Partition.. Fetch bytes specified byte. If a topic has 20 partitions and 5 consumers, each consumer needs at least 4MB of free memory to receive records. When allocating memory to consumers, you can allocate more to them because if a consumer in the group crashes, the remaining consumers need to handle more partitions.

. Max. Partition. The fetch bytes than broker value must be able to receive the biggest news of the number of bytes (Max. Message. The size), otherwise they may not be able to read the message, lead to consumers have been hang, try again.

When setting this value, you also need to take into account the time consumers spend processing data. Consumers need to call the poll() method frequently to avoid session expiration and partition rebalancing. If a single call to poll() returns too much data, consumers need more time to process and may not be able to conduct the next poll in time to avoid session expiration. Appear this kind of circumstance, can put the Max. Partition.. Fetch bytes change little, or extended the session expiration time.

7.4 the session. A timeout. Ms

The value of this property specifies how long the consumer can be disconnected from the server before being considered dead. The default is 3s. If a consumer does not send a heartbeat to the group coordinator within the time specified by session.timeout.ms, it is considered dead, and the coordinator triggers rebalancing to allocate its partitions to other consumers in the group. Heartbeat.interval.ms specifies how often the poll() method sends the heartbeat to the coordinator, and session.timeout.ms specifies how long the consumer can go without sending the heartbeat. Therefore, both properties need to be modified at the same time. Heartbeat.interval. ms must be smaller than session.timeout.ms, which is one third of session.timeout.ms.

Session.timeout. ms low: Crashed nodes can be detected and recovered more quickly, although prolonged polling or garbage collection can lead to unintended rebalancing.

Session.timeout. ms increase: This reduces accidental rebalancing, but it takes longer to detect node crashes.

7.5 auto. Offset. The reset

This property specifies what the consumer should do if a partition is read without an offset or if the offset is invalid (the record containing the offset is obsolete and deleted because the consumer has been invalid for a long time). The default value is latest, and if the offset is invalid, the consumer will start reading from the most recent record (the record generated after the consumer started). The other value is earliest, in which case the offset is invalid, the consumer will read the partition’s record from the starting position.

Enable.auto.com 7.6 MIT

This property specifies whether the consumer automatically submits the offset. The default value is true. To minimize duplicates and data loss, you can set it to false and control when you commit offsets. If set to true, you can also control the frequency of submissions by configuring the auto.mit.interval. ms property.

7.7 partition. The assignment. The strategy

Partitions are assigned to consumers in the group. PartitionAssignor determines which partitions should be assigned to which consumer based on a given consumer and topic. Kafka has two default allocation policies

Range(default): This policy assigns consumers contiguous partitions of a topic. Suppose consumers C1 and C2 subscribe to both topic T1 and topic T2, and each topic has three partitions. Then consumer C1 is likely to be assigned to partition 0 and partition 1 of these two topics, four partitions; And consumer C2 is assigned to partition 2 of these two topics, two partitions. Because each topic has an odd number of partitions, and the allocation is done independently within the topic, the first consumer ends up with more partitions than the second consumer. This happens whenever the Range policy is used and the number of partitions is not divisible by the number of consumers.

org.apache.kafka.clients.consumer.RangeAssignor

RoundRobin: This policy assigns all partitions of a topic to consumers one by one. If the RoundRobin policy is used to partition consumer C1 and consumer C2, consumer C1 will be divided into partitions 0 and 2 of topic T1 and 1 of topic T2. Consumer C2 will be assigned to partition 1 of topic T1 and partitions 0 and 2 of principal T2. In general, if all consumers subscribe to the same topic, the RoundRobin policy assigns the same number of partitions to all consumers (up to one partition difference).

org.apache.kafka.clients.consumer.RoundRobinAssignor

7.8 the client id

This property can be any string that the broker uses to mark messages sent from the client. It is often used in logs, metrics, and quotas.

7.9 Max. Poll. Records

This property controls the number of records that can be returned by a single call to the poll() method and controls the amount of data that needs to be processed in the poll.

. 7.10 the receive buffer, bytes and the send buffer. The bytes

The size of the TCP buffer used by the socket for reading and writing data can also be set. If they are set to -1, the operating system defaults are used. If the producer or consumer is in a different data center than the broker, you can increase these values appropriately because networks across data centers tend to have higher latency and lower bandwidth.

References:

This article takes 7700 words, after a day of learning summary, welcome to like.

  • In-depth Understanding of Kafka’s core Design and Practice