Each time KafkaConsumer(consumer) calls the poll() method, it always returns records that were written to Kafka by the producer but not yet read by the consumer, so we can track which records were read by which consumer in the group. As discussed earlier, Kafka does not require consumer validation like other JMS queues, which is a unique feature of Kafka. Instead, consumers can use Kafka to track the position (offset) of messages in a partition.
We call updating the current location of a partition a commit.
So how does the consumer submit the offset? The consumer sends a message to a special topic called _consumer_offset that contains the offset for each partition. Offsets are of little use if the consumer is always running. However, if the user crashes or new consumers join the group, rebalancing can be triggered and each consumer may be assigned to a new partition instead of the one that was previously processed. In order to be able to continue, the consumer needs to read the last committed offset for each partition and continue processing from where the offset specified.
If the offset submitted is less than the offset of the last message processed by the client, the message between the two offsets is processed repeatedly, as shown in Figure 4-6.
Automatically submit
The simplest way to commit is to have the user automatically submit offsets. If enable.auto.mit is set to true, the consumer will automatically submit the maximum offset received from the poll() method every 5s. The submission interval is controlled by auto.mit.interval. ms. The default value is 5s. Like everything else in the consumer, automatic submission takes place in a poll(). Each time a consumer polls, it checks to see if it is time to commit the offset, and if so, it commits the offset returned from the previous poll.
However, before using this easy method, you need to know what results it will bring. Suppose we still use the default 5s commit interval, and a rebalancing occurs 3s after the last commit, after which the consumer reads the message from the last commit offset. At this point, the offset is already 3s behind, so messages arriving within 3s are processed repeatedly. It is possible to commit offsets more frequently by modifying the commit interval to reduce the time window in which duplicate messages can occur, but this is neither possible nor completely avoided.
When using automatic submission, each call polling party afraid will offset present it to the last call to return back to, it doesn’t know the specific what information has already been dealt with, so before the call again to ensure that all the current best call returns messages are processed (in call close () method will be automatically submitted before). There are generally no problems, but be careful when handling exceptions or exiting polling early.
While auto-commit is convenient, it doesn’t leave developers the leeway to avoid reprocessing messages.
Submit the current offset
Most developers eliminate the possibility of lost messages by controlling the offset commit time, reducing the number of duplicate messages when rebalancing occurs. The consumer API provides an alternative way to submit offsets, allowing developers to submit the current offset disk when necessary, rather than based on time intervals.
Disable auto commit and set auto.mit.offset to false to let the application decide when to commit the offset amount. CommitSync () is the simplest and most reliable way to commit offsets. The API submits the latest offset returned by the poll() method, returns it as soon as the poll() method succeeds, and throws an exception if the poll() method fails.
Keep in mind that commitSync() will commit the latest offset returned by poll(), so make sure you call commitSync() after processing all the records or you still risk losing the message. If rebalancing occurs, all messages between the most recent batch of messages and the time when rebalancing occurs are processed repeatedly.
Here is an example where we commit offsets using the commitSync() method after processing the most recent batch of messages.
Asynchronous submission
One disadvantage of synchronous commit is that the application blocks until the broker responds to the commit request, limiting the throughput of the application. We can increase throughput by reducing the commit frequency, but if rebalancing occurs, it will increase the number of duplicate messages.
You can use the asynchronous submission API at this point. We simply send the submit request without waiting for the broker to respond.
We mention the complexity of this issue and the importance of the delivery order because commitAsync() also supports callbacks, which are executed when the broker responds. Callbacks are often used to log commit errors or generate metrics, but if you’re going to use them for retries, be sure to pay attention to the order in which you commit.
Retry asynchronous commit
We can use a monotonically increasing sequence number to maintain the order of asynchronous commits. Increments the sequence number after each offset commit or when the offset is committed in a callback. Before retrying, check that the serial number of the callback is equal to the offset to be committed. If so, no new commit is available and it is safe to retry. If the sequence number is large, a new commit has been sent and retries should be stopped.
Synchronous and asynchronous combined commits
In general, for the occasional failed commit, not retrying is not a big deal, because if the failed commit is due to a temporary problem, subsequent commits will always succeed. But if this is the last commit before closing the consumer or rebalancing, make sure the commit succeeds.
Therefore, a combination of commitAsync() and commitSync() is typically used before the consumer closes. Here’s how they work (we’ll discuss how to commit offsets before rebalancing happens when we talk about rebalancing listeners later):
Commit a specific offset
Offsets are submitted with the same frequency as message batches are processed. But what if you want to commit more often? What if the poll() method returns a large amount of data and you want to commit an offset in the middle of a batch to avoid reprocessing the whole batch of messages due to rebalancing? This cannot be done by calling commitSync() or commitAsync() because they commit only the last offset before the messages in the batch have been processed.
Fortunately, the consumer API allows you to pass in a map of partitions and offsets that you want to commit when calling commitSync() and commitAsync() methods. Assuming you process half a batch of messages, the last message from the “Customers” partition 3 of the topic has an offset of 5000, and you can call commitSync() to commit it. However, because the consumer might read more than one partition, you need to keep track of offsets for all partitions, so controlling offsets at this level makes the code complicated.
Here is an example of submitting a specific offset:
Rebalance the listener
As mentioned in the Commit offsets section, the consumer does some cleanup before exiting and rebalancing the partition.
You commit the offset of the last processed record before the consumer loses ownership of a partition. If the consumer has prepared a buffer to handle unexpected events, the records accumulated in the buffer need to be processed before losing ownership of the partition. You may also need to close file handles, database connections, and so on.
The allocation for the consumer to the new partition or remove the old partition, can by consumer API to perform some application code to call the subscribe () method is spread in a ConsumerRebalancelistener instance. There are two need to implement ways of ConsumerRebalancelistener.
(1) The public void onPartitionsRevoked(Collection partitions) method is invoked before rebalancing begins and after the consumer stops reading messages. If the offset is committed here, the next consumer of the managed partition will know where to start reading.
(2) The public void onPartitionsAssigned(Collection Partitions) method is called after repartitioning and before consumers start reading messages.
The following example demonstrates how offsets can be submitted via the onPartitionsRevoked() method before ownership of a partition is lost. In the next section, we’ll demonstrate another example that uses both the onPartitionsAssigned() method.
Records are processed starting at a specific offset
So far, we’ve seen how to process messages from the latest offset of each partition using the poll() method. However, sometimes we need to start reading messages at a specific offset.
SeekToBeginning (Collection TP) and seekToEnd(Collection TP) are used if you want to start reading messages from the beginning of the partition or skip to the end of the partition to start reading messages.
However, Kafka also provides an API for finding specific offsets. It has many uses, such as rolling back a few messages or skipping a few messages forward (a time-sensitive application might want to skip a few messages forward if the processing is lagging). It will be even more surprising when using systems other than Kafka to store offsets.
Imagine a scenario where an application reads events from Kafka (perhaps a stream of user-click events for a website), processes them (perhaps using an automated program to clean up the click-action Wells and add session information), and then saves the results to a database, a NoSQL storage engine, or Hadoop. Let’s say we really don’t want to lose any data and save the same results multiple times in the database.
In this case, the consumer’s code might look like this:
This can be avoided if record keeping and offsets can be done in a single atomic operation. Either both the record and the offset are committed successfully, or neither is committed. If records are stored in a database and offsets are committed to Kafka, atomic operations cannot be implemented.
However, what if both records and offsets are written to the database in the same transaction? Then we know that either the record and offset were either committed successfully or neither, and reprocess the record.
The question now is: if the offset is stored in a database rather than Kafka, how will the consumer know where to start reading when it gets a new partition? Use the seek() method at this point. The seek() method can be used to find the offset stored in the database when a consumer starts or allocates a new partition.
The following example Outlines how to use this API. Using ConsumerRebalancelistener and seek () the war to ensure we are saved in the database of the offset specified position began to process the message.
How to quit
As mentioned earlier in the discussion of polling, there is no need to worry about consumers polling for messages in an infinite loop, we will show them how to exit the loop gracefully.
If you determine that you want to exit the loop, call the consumer.wakeup() method from another thread. If the loop is running in the main thread, you can call this method in ShutdownHook. Keep in mind that consumer.wakeup() is the only method a consumer can safely call from another thread. Calling consumer.wakeup() exits poll() and throws WakeupException, or if the thread is not waiting for polling when cconsumer.wakeup() is called, the exception will be thrown on the next call to poll(). We don’t need to deal with WakeupException because it’s just a way to get out of the loop. However, it is necessary to call consumer.close() before exiting the thread, which commits anything that has not been committed, sends a message to the group broker telling it to leave the group, and then triggers rebalancing without waiting for the session to time out.
Below is the code for the consumer exit thread running on the main thread.