When kafka creates a topic, it can specify the number of partitions. After the Producer sends the message, it uses load balancing to allocate the message to one partition. The offsets of each partition are independent of each other.

For example, if the topic is foo and the number of partitions is 3, after the consumer submits uniqueness, it might be in the following state.

By default, the consumer adopts the automatic submission shift, which is automatically submitted every 5 seconds. However, since the automatic submission shift may lead to message loss, it is generally turned off and manual submission is adopted. Manual submission is very flexible and not subject to any restrictions. The broker will also mindlessly accept the shift submitted by the consumer. That is, if we actually consumed a shift of 15 messages, but we manually submitted 20, the broker will assume that the first 20 messages have been consumed and no error will be reported.

Manually submit the displacement policy

  1. Turn off automatic submission of displacement parametersenable.auto.commit=false

Manually submit the shift API

  1. synchronous
void commitSync(a);

void commitSync(Duration timeout);

void commitSync(Map<TopicPartition, OffsetAndMetadata> offsets);

void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsets, final Duration timeout);
Copy the code

When commitSync() is called, the Consumer program blocks until the remote Broker returns the commit result. In any system, blocking due to program rather than resource constraints can be a bottleneck that affects the TPS of the entire application.

We can choose to extend the submission interval, but the consequence of this is that the Consumer submits less frequently, and more messages are re-consumed the next time the Consumer restarts.

  1. asynchronous
void commitAsync(a);

void commitAsync(OffsetCommitCallback callback);

void commitAsync(Map<TopicPartition, OffsetAndMetadata> offsets, OffsetCommitCallback callback);
Copy the code

After calling commitAsync(), it returns immediately without blocking and therefore does not affect the TPS of the Consumer application. Because it is asynchronous, Kafka provides callback functions that allow you to perform post-commit logic, such as logging or handling exceptions

Can commitAsync replace commitSync?

The answer is no. The problem with commitAsync is that it does not automatically retry when a problem occurs. Because it is an asynchronous operation, if it automatically retries after a failed commit, the displacement value submitted by the time it retries may already be “out of date” or not up to date. Therefore, asynchronous commit retries don’t really make sense, so commitAsync doesn’t retry.

Synchronous and asynchronous combined submission mode

If there is no exception, use asynchronous submission. If there is an exception, use synchronous price increase and retry policy of synchronous submission to help us automatically retry. The code is as follows:

try {
    while (true) {
        ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofMillis(100)); handlerRecord(records); kafkaConsumer.commitAsync(); }}catch (
        Exception e) {
    errHandler(e);
} finally {
    try {
        kafkaConsumer.commitSync();
    } catch(Exception e) { kafkaConsumer.close(); }}Copy the code

Fine-grained submission offsets

Kafka commits all the messages from the pool by default. If we pull a large number of messages at once, we can fragment the commit locations, just like splitting a large transaction into smaller transactions to execute one at a time. This reduces the recovery time in the event of an error.

Kafka’s offset management is per-partition isolated, so we can count the message processing tree for each partition and commit it separately.

The specific code is as follows:

Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
AtomicInteger count = new AtomicInteger();
try {
    ConsumerRecords<String, Object> records = kafkaConsumer.poll(Duration.ofMillis(100));
    records.forEach(record -> {
        handlerRecord(record);
        offsets.put(new TopicPartition(record.topic(), record.partition()), new OffsetAndMetadata(record.offset()));
        count.getAndIncrement();
        // Commit when the number of messages processed reaches 100
        if (count.get() % 100= =0) {
            kafkaConsumer.commitAsync(offsets, null); }}); }catch (Exception e) {
    errHandler(e);
} finally {
    try {
        kafkaConsumer.commitSync();
    } catch(Exception e) { kafkaConsumer.close(); }}Copy the code