There are many articles on the web about Kafka’s log retention strategy. However, there have been some changes in the strategy, so this article for the newer versions of Kafka to do a unified discussion. Unless explicitly stated, Kafka 1.0.0 is used as the analysis object in this article.

Log retention strategies are Kafka’s rules for saving topic data, and I’ll describe them in the following ways:

Retention Strategy Types

Retention mechanics and how they work

I. Retention strategy types

Currently, there are two main types of policies related to log retention: DELETE and Compact. The mechanics of these two forms of retention are completely different. This article focuses on retention strategies for delete types. Users can specify the default policy type for all topics in the cluster by setting the broker-side parameter log.cleanup.policy. You can also use the topic-level parameter cleanup.policy to set a policy type other than the default for some topics. The current log.cleanup.policy parameter defaults to [delete,compact], which is a list parameter indicating that both delete and compact retention policies are enabled for all topics in the cluster — a new feature introduced in 0.10.1.0. Prior to 0.10.1.0, this parameter could only be used to select one of the two retention policies. However, in practice, many users complained that compact topics had expired key messages that were not deleted, so the community modified this parameter to allow both retention policies to be enabled for one topic.

Again, this article only discusses retention strategies of the DELETE type.

Retention mechanics and how they work

Before we dive into the details of the various retention mechanisms, let’s talk briefly about how Kafka handles log retention. Each Kafka broker startup, will open a timing task, in the background on a regular basis to check and implement all topic log retained, the timing task cycle from the broker (trigger time parameters the retention. Check. Interval. Ms control, the default is 5 minutes, That is, every 5 minutes every broker tries to check if there are logs that can be deleted. So if you want to shorten the interval, just down the retention. Check. Interval. Ms.

Since log retention and log deletion are really two sides of the same question, let’s discuss what rules Kafka uses to delete logs. It is important to note, however, that the log segments to be deleted are log segments, which are individual files ending in.log, not the entire folder. It is also important to note that active log segments are never deleted, regardless of which retention mechanism the user has configured.

There are currently three retention mechanisms:

Based on spatial dimensions

Based on time dimension

Based on the initial displacement dimension

The first two strategies are believed to be familiar to everyone, while the third strategy is not introduced much on the Internet due to its new addition for a short time. Let’s take it one by one.

2.1 Based on spatial dimensions

Also known as size-based Retention, this refers to the fact that Kafka periodically deletes log segments for topics that exceed disk space thresholds. This threshold is controlled by the broker-side parameter log.retention. Bytes and the topic-level parameter retention. Bytes. The default value is -1, which indicates that Kafka is not currently using the retention mechanism. If the user wants to enable this retention mechanism, it must explicitly set log.retention. Bytes (or retention.bytes).

Once a threshold is set, Kafka tries to compare the current total log size with the size of at least one log segment in a scheduled task to see if the threshold is exceeded. The total size here refers to the size of all log segment files, excluding the size of index files! If so, it will try to delete from the oldest segment file. Pay attention to the “exceed the threshold of at least one log size”, that is to say more than part of the threshold value must be greater than the size of a log period, or not to delete, reason is because the delete mark was for log files – files can only be deleted as a whole, can’t delete the part.

For example, if the log segment size is 700MB, the current partition has four log segment files of 700MB, 700MB, 700MB, and 1234B — clearly the 1234B file is the active log segment. In this case, the total log size of the partition is 3 x 700MB+1234B=2100MB+1234B. If the threshold is set to 2000MB, the portion that exceeds the threshold is 100MB+1234B, which is smaller than the size of the log segment 700MB. Therefore, Kafka does not delete any logs even if the total log size exceeds the threshold. Otherwise, if the threshold is set to 1400MB, then the portion exceeding the threshold is 700MB+1234B 700MB, and Kafka deletes the oldest segment file.

2.2 Based on the time dimension

Also known as time-based Retention, this refers to the fact that Kafka periodically fails to delete log segments for topics that exceed a time threshold. This threshold is controlled by the broker-side parameters log.retention. Ms, log.retention. Mintues, log.retention. If log.retention. Ms, log.retention. Mintues, and log.retention. Hours are set at the same time, log. The log. The retention. Gets the most times. The default values of these three parameters are null, NULL, and 168 respectively, so Kafka keeps logs for 7 days per topic by default.

Here we need to discuss how to define “7 days”. Prior to 0.10.0.0, Every time Kafka checked, it compared the current time to the last modification time of each log segment file, and if the difference between the two exceeded the threshold set above (say, 7 days) Kafka would attempt to delete the file. However, this definition is problematic because the last modification time of a file is variable — for example, when a user looks at a log segment file using a touch command or when Kafka splits the file, it can cause the last modification time to change and thus confuse the rule. Therefore, since version 0.10.0.0, Kafka introduces a timestamp field in the message body (not just to fix this, of course) and maintains a maximum timestamp field for each log segment file. Expiration is determined by comparing the current time with the maximum timestamp field. The advantage of using the current maximum timestamp field is that it is transparent to the user and cannot be directly modified externally, so there is no confusion of judgment.

The update mechanism for the maximum timestamp field is also simple, and attempts are made to update the field each time a new message is written to the log segment. Because message timestamps are generally incremental, the maximum timestamp field is guaranteed to be updated with each write operation, and once a log segment is full and shard, it does not receive any new messages and the value of the maximum timestamp field remains unchanged. If the distance from the current time exceeds the threshold, the log segment file is deleted.

2.3 Based on the initial displacement dimension

While the first two retention mechanisms are actually fairly familiar to users, let’s discuss the third one: log start offset. This is actually a new feature in version 0.11.0.0. This feature was originally added for Kafka flow processing applications – there are a lot of intermediate messages in flow processing applications, which may have been processed but are still stored in the Topic log and take up a lot of disk space. Setting a time-dimension mechanism to delete these messages requires the user to set a very small time threshold, which may result in the messages being deleted before being processed by downstream operators. If they are too large, they greatly increase the space footprint. Therefore, the community introduced a third retention mechanism in 0.11.0.0: based on initial displacement

The start shift is the current start shift of the partitioned log – note that this is the partition level value, not the log segment level. Therefore, each partition maintains only one initial displacement value. This value is set to the base offset of the oldest log segment during initialization. As the log segment is deleted, this value will be updated to the base offset of the current oldest log segment. In addition, Kafka provides a script command to help users set the initial displacement of a specified partition: kafka-delete-records.sh.

The retention mechanism is enabled by default and does not require any user configuration. Kafka checks for each log segment: 1. Get the base shift of the next log segment B from segment A. 2. If the value is smaller than the current partition start displacement, delete log segment A.

Log, a3. log, a4. log, a3. log, a4. log, and a5. log are active log segments. The message ranges in the five log segment files are 0 to 9999,10000 to 19999,20000 to 29999,30000 to 39999 and 40000 to 43210(A5 is not full) respectively. What if I am convinced that the messages in the first three log segment files have been processed and want to delete them? Since I can’t predict how fast these log segment files will be generated or consumed, neither time-based deletion nor space-based deletion works. At this point, I can use the kafka-delete-records.sh script to set the partition’s starting displacement to a4.log’s starting displacement, which is 40000. To do this, I first need to create a JSON file a.json, which looks like this:

{partitions:[{topic: test, partition: 0,offset: 40000}],version:1}

Then execute the following command:

bin/kafka-delete-records.sh –bootstrap-server localhost:9092 –offset-json-file a.json

If all is well, you should see output similar to this:

Executing records delete operation

Records delete operation completed:

partition: test-0 low_watermark: 40000

At this point, the initial displacement of partition 0 of test is manually adjusted to 40000, so theoretically all log segments with a maximum message displacement of 40000 can be deleted. With this mechanism, users can implement a more flexible retention strategy.

These are the three retention mechanisms Kafka currently has for delete retention types. Maybe more retention strategies will be added to the community in the future