This article is from OPPO’s Internet Technology team. At the same time, you are welcome to follow our official account: OPPO_tech, and share OPPO cutting-edge Internet technology and activities with you.

Kafka is a distributed publishing-subscription-based messaging system with high throughput, high fault tolerance, high reliability and high performance. It is mainly used for application decoupling, traffic peaking, asynchronous messaging and other scenarios.

In order to let you more in-depth understanding of the internal implementation principle of Kafka, the article will start from the topic and log message storage, deletion and retrieval, and then introduce the implementation principle of its copy mechanism, and finally introduce the production and consumption of the implementation principle in order to more reasonable application in the actual business.

In addition, this article is long, it is recommended to like after slowly read:)

1. The introduction

Kafka is a distributed publishing-subscription-based messaging system with powerful message processing capabilities. It has the following features compared to other messaging systems:

  • Fast data persistence, the implementation of O(1) time complexity of data persistence capability.
  • High throughput, up to 10W/s on a normal server.
  • The mechanisms of high reliability, message persistence, and replica systems ensure that messages are reliable and can be consumed multiple times.
  • High scalability. Like other distributed systems, all components support distributed and automatic load balancing, enabling rapid and convenient system expansion.
  • Offline and real-time processing capabilities coexist, providing both online and offline message processing capabilities.

Because of these excellent features, it is widely used in application decoupling, traffic peaking, asynchronous messaging scenarios such as message middleware, log aggregation, flow processing, and so on.

This article will introduce Kafka from the following aspects:

  1. The first chapter briefly introduces the characteristics and advantages of Kafka as a distributed message publishing and subscription system

  2. Chapter 2 describes the topics and logs of the Kafka system. How messages are stored, retrieved, and deleted

  3. Chapter 3 introduces the kafka replica mechanism to understand how Kafka internally implements high reliability of messages

  4. The fourth chapter will introduce the partitioning algorithm and idempotent characteristics of message from the production end

  5. The fifth chapter will understand the consumption group, consumption shift and the realization of the rebalancing mechanism from the consumption side of the message

  6. The last chapter briefly summarizes this article

2. Topics and logs

2.1 the theme

A topic is a logical concept for storing messages, and can be simply understood as a collection of a class of messages created by the consumer. A topic in Kafka typically has multiple subscribers consuming messages for the topic, and there can be multiple producers writing messages to the topic.

Each topic can be divided into multiple partitions, each of which stores a different message. When a message is added to a partition, it is assigned a displacement offset (increasing from 0) and is guaranteed to be unique on the partition. The order of messages on the partition is guaranteed by the offset, that is, the messages in the same partition are ordered, as shown in the figure below

Partitions of the same topic are assigned to different brokers, which ensures that the Kafka cluster has a scalable basis.

For example, the topic nginx_access_log has three partitions, as shown in the figure above. A partition logically corresponds to a Log and physically corresponds to a folder.

Drwxr-xr-x 2 root root 4096 October 11 20:07 nginx_access_log-0/ drwxr-xr-x 2 root root 4096 October 11 20:07 Nginx_access_log-2 / drwxr-xr-x 2 root root 4096 10月 11 20:07 nginx_access_log-2/Copy the code

When a message is written to a partition, it is actually written to the folder where the partition resides. The log is divided into multiple segments, each consisting of a log file and an index file. Each Segment is of a limited size (configured in the kafka cluster configuration file log.segment. Bytes, default is 1073741824 bytes, that is, 1GB). When the size of the shard exceeds the limit, a new shard is created, and external messages are written only to the latest shard (sequential IO).

Rw - r - r - 1 root root on October 11, 1835920 7 00000000000000000000. The index - rw - r - r - 1 root root on October 11, 1073741684 7 00000000000000000000. The log - rw - r - r - 1 root root on October 11, 2737884 7 00000000000000000000. The timeindex - rw - r - r - 1 root Root on October 11, 1828296 7:30 00000000000003257573. The index - rw - r - r - 1 root root on October 11, 1073741513 7:30 00000000000003257573. The log - rw - r - r - 1 root root on October 11, 2725512 7:30 00000000000003257573 timeindex - rw - r - r - 1 root Root on October 11, 1834744 therefore 00000000000006506251. The index - rw - r - r - 1073741771 on October 11 therefore 1 root root 00000000000006506251. The log - rw - r - r - 1 root root on October 11, 2736072 therefore 00000000000006506251. Timeindex - rw - r - r - 1 root Root on October 11, 1832152 17:927 00000000000009751854. The index - rw - r - r - 1073740984 on October 11 17:927 1 root root 00000000000009751854. The log - rw - r - r - 1 root root on October 11, 2731572 17:927 00000000000009751854 timeindex - rw - r - r - 1 root Root on October 11, 1808792 20:06 00000000000012999310. The index - rw - r - r - 1073741584 on October 11 20:06 1 root root 00000000000012999310. The log - rw - r - r - 1 root root 10 October 11 17:927 00000000000012999310. The snapshot - rw - r - r - 1 root root On October 11, 2694564 20:06 00000000000012999310. Timeindex - rw - r - r - 10485760 on October 11 20:09 1 root root 00000000000016260431. The index - rw - r - r - 1 root root on October 11, 278255892 20:09 00000000000016260431. The log - rw - r - r - 1 root root 10 October 11 20:06 00000000000016260431. The snapshot - rw - r - r - 10485756 on October 11 20:09 1 root root 00000000000016260431. Timeindex - rw - r - r - 1 root root 8 October 11 19:03 leader - epoch - checkpointCopy the code

A shard contains multiple log files with different suffixes. The offset of the first message in the shard will be used as the base offset of the shard. The offset will be a fixed length of 20, and the offset will be used as the name of the index file and the log file. Such as 00000000000003257573. The index, 00000000000003257573. The log, 00000000000003257573. The timeindex, files of the same file name a shard (ignoring suffix), The meanings of log files other than. Index,. Timeindex, and. Log files are as follows:

The file type role
.index The offset index file records the mapping relationship between relative displacement and start address. The relative displacement represents the first message in the fragment and starts from 1. The start address indicates the start address of the corresponding relative displacement message in the shard
.timeindex Timestamp index file that records < timestamp, relative displacement > mappings
.log Log files that store message details
.snaphot The snapshot file
.deleted When a shard file is deleted, the. Delete suffix is added to all files in the shard, and then thedelete-fileThe task delays deleting these files (file.delete.delay.ms can set the delay time)
.cleaned Temporary files for log clearing
.swap Temporary file after Log Compaction
.leader-epoch-checkpoint

2.2 Log Index

First is introduced. The index file, here to file 00000000000003257573. The index, for example, first of all, we can through the following command to view the content of index file, we can see the output structure for the offset, the position >. In fact, the relative displacement stored in the index file is not offset but 3257687. For example, the relative displacement of the first message is 0, and the base offset is added when the output is formatted. As shown in the figure above, <114,17413> indicates that the fragment relative displacement of the message is 114, and its displacement is 3257573+114, that is, 3257687. Position Indicates the physical address of the offset in the. Log file. You can obtain the physical address of the offset from the. Index file. The index is constructed as a sparse index, which does not guarantee that every message in the shard will be mapped to the index file (similar to the.timeindex index). The main reason is to save disk space and memory space, because the index file will eventually be mapped to memory.

#View the first 10 records of the fragment index file
bin/kafka-dump-log.sh --files /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.index |head -n 10
Dumping /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.index
offset: 3257687 position: 17413
offset: 3257743 position: 33770
offset: 3257799 position: 50127
offset: 3257818 position: 66484
offset: 3257819 position: 72074
offset: 3257871 position: 87281
offset: 3257884 position: 91444
offset: 3257896 position: 95884
offset: 3257917 position: 100845
#View the last 10 records of the fragment index file
$ bin/kafka-dump-log.sh --files /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.index |tail -n 10
offset: 6506124 position: 1073698512
offset: 6506137 position: 1073702918
offset: 6506150 position: 1073707263
offset: 6506162 position: 1073711499
offset: 6506176 position: 1073716197
offset: 6506188 position: 1073720433
offset: 6506205 position: 1073725654
offset: 6506217 position: 1073730060
offset: 6506229 position: 1073734174
offset: 6506243 position: 1073738288
Copy the code

For example, check the message offset is 6506155: First according to the offset find corresponding shard, fragmentation of the 65061 to 00000000000003257573, and then through the dichotomy in 00000000000003257573. The index file to find the biggest index value is not more than 6506155, to get the < offset: 6506150, position: 1073707263>, then scan sequentially from the position of 1073707263 in 00000000000003257573.log to find the message whose offset is 650155

Since version 0.10.0.0, Kafka has added a.timeindex index file to sharding log files to locate messages by timestamp. Similarly, we can use the script kafka-dump-log.sh to view the contents of the time index file.

#View the first 10 records of the fragment time index file
bin/kafka-dump-log.sh --files /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.timeindex |head -n 10
Dumping /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.timeindex
timestamp: 1570792689308 offset: 3257685
timestamp: 1570792689324 offset: 3257742
timestamp: 1570792689345 offset: 3257795
timestamp: 1570792689348 offset: 3257813
timestamp: 1570792689357 offset: 3257867
timestamp: 1570792689361 offset: 3257881
timestamp: 1570792689364 offset: 3257896
timestamp: 1570792689368 offset: 3257915
timestamp: 1570792689369 offset: 3257927

#View the first 10 records of the fragment time index file
bin/kafka-dump-log.sh --files /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.timeindex |tail -n 10
Dumping /tmp/kafka-logs/nginx_access_log-1/00000000000003257573.timeindex
timestamp: 1570793423474 offset: 6506136
timestamp: 1570793423477 offset: 6506150
timestamp: 1570793423481 offset: 6506159
timestamp: 1570793423485 offset: 6506176
timestamp: 1570793423489 offset: 6506188
timestamp: 1570793423493 offset: 6506204
timestamp: 1570793423496 offset: 6506214
timestamp: 1570793423500 offset: 6506228
timestamp: 1570793423503 offset: 6506240
timestamp: 1570793423505 offset: 6506248
Copy the code

For example, I want to see a message starting with the timestamp 1570793423501:1. First locate the fragment, compare 1570793423501 with the maximum timestamp of each fragment (the maximum timestamp is the last record time of the index file, and if the time is 0, the last change time of the log fragment is taken), until the log fragment greater than or equal to 1570793423501 is found. So time to locate indexed file 00000000000003257573. Timeindex, its biggest timestamp is 1570793423505; 2. Use dichotomy to find the maximum index entry greater than or equal to 1570793423501, that is,

(6506240 is the offset, the relative displacement is 3247667). 3. Find the maximum index value <3248656,1073734174> in the deindexing file based on relative displacement 3247667. 4. Start scanning for data at 1073734174 in log file 00000000000003257573.log and search for data not smaller than 1570793423501.

2.3 Log Deletion

Unlike other message middleware, Kafka cluster message will not be deleted because consumption or not, just like a log message will eventually fall plate, and provides the corresponding strategy of periodic (through the log parameters. The retention. The check. The interval. Ms to setup, The default value is 5 minutes.) Delete or compress (the broker configuration file log.cleanup.policy is deleted if it is “delete” or compressed if it is “compact” (default value is “DELETE”).

2.3.1 Deleting time-based logs

parameter The default value instructions
log.retention.hours 168 Log Retention time (hours)
log.retention.minutes There is no Log retention time (minutes). The priority is greater than hours
log.retention.ms There is no Log retention time (milliseconds). The priority is greater than minutes

If the retention time of messages in the cluster exceeds the threshold (log.retention. Hours, which is 168 hours by default, that is, seven days), messages need to be deleted. In this case, the maximum timestamp of the fragment log is used to determine whether the time of the fragment meets the deletion condition. The maximum timestamp is the last index record in the timestamp index file. If the corresponding timestamp value is greater than 0, the value is adopted.

The reason for not directly selecting the last modification time here is to avoid unintentional tampering of the file of the shard log, resulting in inaccurate time.

If it happens that all log fragments under the partition have expired, a new log fragment is created as a write file for new messages, and then the delete parameter is executed.

2.3.2 Deleting space-based logs

parameter The default value instructions
log.retention.bytes 1073741824 (1G) : This function is disabled by default and has an infinite value The total size of log files does not refer to the size of a single fragment
log.segment.bytes 1073741824 (1G) Size of a single log fragment

The log size diff (Totalsize-log.rentention. Bytes) to be deleted is calculated first, and then looks at the set of files that can be deleted starting with the oldest fragment (if diff-segment.size>=0, then the deletion condition is met) before the deletion is performed.

2.3.3 Deleting logs based on the Log Start Offset

In general, the starting offset of the log file (logStartOffset) will be equal to the baseOffset of the first log fragment, but the value will grow as messages are deleted. The value of logStartOffset is actually the smallest message in the log set, and messages less than this value will be cleaned up. As shown in the figure above, we assume logStartOffset=7421048, and the log deletion process is as follows:

  • Start with the oldest log shard and determine if the baseOffset of the next shard is less than or equal to the logStartOffset value. If it is, it needs to be deleted, so the first shard is deleted.
  • The baseOffset=6506251<7421048 of the next fragment of fragment 2 needs to be deleted.
  • BaseOffset =9751854>7421048, so shard 3 will not be deleted.

2.4 Log Compression

When the broker configuration file log.cleanup.policy parameter is set to “compact”, compression is performed. Compression is different from normal compression, where only the last version value is retained for the same key message, as shown in the figure below. Before compression, the increment of offset is continuous. After compression, the increment of offset may be discontinuous. Only five message records are retained.

The cleaner-offset-checkpoint file in the Kafka log directory is used to record the clean offsets in each partition of each topic. This offsets can be used to divide the log files in the partition into two parts: clean indicates that the logs have been compressed; Dirty indicates that the log has not been compressed, as shown in the figure below. (Active Segment does not participate in log compression because new data is written to the file.)

-rw-r--r-- 1 root root 4 oct 11 19:02 Cleaner-offset -checkpoint drwxr-xr-x 2 root root 4096 oct 11 20:07 Nginx_access_log-0 / drwxr-xr-x 2 root root 4096 October 11 20:07 nginx_access_log-1/ drwxr-xr-x 2 root root 4096 October 11 20:07 nginx_access_log-2/ -rw-r--r-- 1 root root 0 September 18 09:50. Lock-rw-r --r-- 1 root root 4 October 16 11:19 Log-start-offset - checkpoint-rw-r --r-- 1 root root 54 September 18 09:50 meta-. properties -rw-r--r-- 1 root root 1518 October 16 11:19 recovery-point-offset- checkpoint-rw-r --r-- 1 root root 1518 October 16 11:19 replication-offset-checkpoint
#cat cleaner-offset-checkpoint
nginx_access_log 0 5033168
nginx_access_log 1 5033166
nginx_access_log 2 5033168
Copy the code

During log compression, the ratio of dirty data to log files (cleanableRatio) is used to determine which logs are preferentially compressed. Then, a mapping relationship between key and offset is established for the dirty data (the maximum offset of the corresponding key is saved) and stored in SkimpyoffsetMap. Then copy the data in the segment segment and keep only the messages recorded in SkimpyoffsetMap. After compression, the size of related log files will be reduced. In order to avoid excessively small log files and index files, During compression, all segments are grouped (the fragment size of a group does not exceed the value of log.segment.bytes). Multiple fragment logs of the same group are compressed into a single fragment.

As shown in the figure above, the clean checkpoint value is 0 before all the messages are compressed, indicating that the partition is not compressed. After the first compression, the log file size of each fragment is reduced, and the clean checkpoint position is moved to the offset value at the end of the compression. During the second compression, the first two fragments {0.5GB,0.4GB} form a group, and {0.7GB,0.2GB} form a group for compression, and so on.

The following figure shows the log compression process:

  1. To calculatedeleteHorizonMsValue: When the value of a message is empty, the message will be retained for a period of time and will be deleted in the next log compression after a timeout, so this is calculateddeleteHorizonMsAccording to the value, you can delete log fragments whose value is empty. (DeleteHorizonMs = Clean part of the last fragment lastModifiedTime - deleteRetionMs, deleteRetionMs through the log configuration file. The cleaner. Delete. Retention. Ms configuration, the default for 24 hours).
  2. Determine the scope of the compression dirty part offset [firstDirtyOffset endOffset) : one of themfirstDirtyOffsetRepresents the starting displacement of dirty, which is usually equal toclear checkpointValue,firstUncleanableOffsetIndicates the minimum displacement that cannot be cleaned, which is generally equal to the baseOffset of the active fragment. Then, the log fragment is traversed from the firstDirtyOffset position, and the mapping between key and offset is filled into the SkimpyoffsetMap when the map is filled or reaches the upper limitfirstUncleanableOffset, you can determine the upper limit of log compressionendOffset.
  3. The log fragments in [logStartOffset,endOffset] are grouped and then compressed as grouped.

A copy of the 3.

Kafka supports redundancy of messages. The number of copies of a topic can be set (the –replication-factor parameter specifies the number of copies of a topic when the topic is created, Offsets. The topic. The replication. The factor set consumer theme _consumer_offsets replications, defaults to 3), each copy contains messages as (but not completely consistent, probably from data in a somewhat behind the master copy of a copy). Each partition will have a copy of a copy of the collection is elected as the main copy (leader), the other is from a copy of all the read and write requests made by master copy available, from a copy of the master copy of the data synchronization to own partition, if the master copy to division downtime, will elect a new master copy foreign service.

3.1 ISR collection

In-sync Replica (ISR) collection: indicates the available Replica set. The leader Replica In each partition maintains the ISR set for this partition. Availability here means that the message volume of the slave replica is not different from that of the master replica. Replicas added to the ISR set must meet the following conditions:

  1. The node where the replica resides must maintain heartbeat communication with ZooKeeper.
  2. The offset of the last message of the slave replica needs to be within the threshold of the offset of the last message of the master replica (replica.lag.max.messages) or the LEO duration of the replica behind the master replica is not greater than the set threshold (replica.lag.time.max.ms), the latter is officially recommended and removed in the new kafka0.10.0 releasereplica.lag.max.messagesParameters.

If the slave copy does not meet any of the above conditions, it is removed from the ISR set, and when it meets the above conditions again, it is added back to the set. The introduction of ISR is mainly to solve the respective defects of synchronous replication and asynchronous replication (if a copy breaks down or times out, the overall performance of the copy group will be slowed down; If only asynchronous replicas are used, when all replicas messages are far behind the primary replicas, there will be message loss once the primary replicas go down and re-elect)

3.2 HW&LEO

HW (High Watermark) is a special offset flag. When consuming, the consumer can only pull the messages smaller than HW, while HW and subsequent messages are invisible to the consumer. This value is managed by the master replica. The primary copy points the HW value +1 to the next offset shift, thus ensuring the reliability of messages before HW.

Log End Offset (LEO) indicates the next Offset of the latest message from the current copy. All copies have this flag. If the copy is a master copy, the production End adds a message to it by adding 1 to it. When a message is successfully pulled from the master replica by the slave replica, its value is also increased.

3.2.1 Update LEO and HW from copy

From copies of the data is independent copy, by send the fetch request to get the data to the master copy, copy from LEO values are saved in two places, one is its own place of nodes), one is the master copy the node, the node itself is mainly keep LEO to update their HW value, master copy save copy from LEO and to update the HW. When a new message is written to the slave copy, the master copy increases its OWN LEO. When the master copy receives the FETCH request from the slave copy, it reads the corresponding data from its own log and updates the LEO value of the slave copy before the data is returned to the slave copy. Once the data is written from the replica, an attempt is made to update its own HW value, comparing the RETURN HW of the master replica in the LEO response with the fetch response, and taking the minimum value as the new HW value.

3.2.2 Master copy updates LEO and HW

The master replica updates its LEO value when a log is written, similar to that of the slave replica. The HW value of the primary copy is the HW value of the partition and determines the visibility of the partition data to the consumer end. The primary copy will try to update its HW value under the following four conditions:

  • Replica becomes master: When a replica becomes master, Kafka tries to update the HW value of the partition.
  • Broker collapses cause copies to be kicked out of the ISR set: if a broker node collapses it is checked to see if the partition is affected, and then to see if the HW value of the partition needs to be updated.
  • When the generating end writes messages to the master copy: When the message is written, the LEO value of the message is increased. In this case, the system checks whether the HW value needs to be modified.
  • When the master replica receives a FETCH request from the slave replica: The master replica attempts to update the partition HW value while processing the FETCH request from the slave replica.

The master copy holds the LEO value of the slave copy and its own LEO value. Here, the LEO value of all copies meeting the conditions will be compared, and the HW value with the smallest LEO value and the most partied value will be selected. The copy meeting the conditions refers to one of the following two conditions:

  • The copy is in the ISR set
  • The LEO duration of the replica behind the master replica is no greater than the set threshold (replica.lag.time.max. Ms, the default value is 10 seconds).

3.3 Data Loss Scenario

As mentioned earlier, if only HW is used for log truncation and water level determination, as shown in the figure above, if there are two replicas (A, B), initially A is the primary replica, B is the secondary replica, and the parameter min.insync.replicas=1, ISR will return success even if only one replica is available:

  • In the initial case, master copy A has written two messages (HW=1, LEO=2, LEOB=1), and copy B has written one message (HW=1, LEO=1).
  • In this case, copy B sends A fetchOffset=1 request to master copy A. After receiving the request, master copy updates LEOB=1, indicating that copy B has received message 0, and then attempts to update the HW value.min(LEO,LEOB)=1After receiving the response from copy B, write to the log and update LEO=2. Then update its HW=1. Although two messages have been written, the HW value will be updated to 2 in the next round of requests.
  • After the restart, the log is truncated according to the HW value, that is, message 1 is deleted.
  • The slave copy B sends A fetchOffset=1 request to the master copy A. If the master copy A is not abnormal, then the slave copy B will become the master copy.
  • When copy A recovers, it becomes A slave copy and logs are truncated according to the HW value, which means that message 1 is lost, and message 1 is permanently lost.

3.4 Data Inconsistency Scenario

As shown in the figure, if there are two replicas A and B, initially A is the primary replica and B is the secondary replica, and the parameter min.insync.replicas=1, ISR will return success even if there is only one replica:

  • In the initial state, the master copy A has written two messages (HW=1, LEO=2, LEOB=1), and the slave copy B has synchronized two messages (HW=1, LEO=2).
  • In this case, the secondary copy B sends A fetchOffset=2 request to the primary copy. After receiving the request, the primary copy A updates the partition HW=2 and returns the value to the secondary copy B. If the secondary copy B breaks down, the HW value fails to be written.
  • Let’s assume that at this point, master replica A is also down, and slave replica B is restored to master replica first. In this case, log truncation occurs, only message 0 is retained, and service is provided externally, assuming that A message 1 is written externally (this message is different from the previous message 1, and different messages are identified in different colors).
  • When copy A is up, it becomes A slave copy and no log truncation occurs because HW=2, but the messages corresponding to shift 1 are actually inconsistent

3.5 Leader epoch mechanism

The HW value is used to measure the success of the copy backup and the log truncation in the case of failure based on the possible data loss and data inconsistency. Therefore, the leader epoch concept was introduced in the new Kafka (0.11.0.0). The leader epoch represents a key-value pair <epoch, offset>, where epoch represents the version number of the leader master copy, encoding from 0. When the leader changes, +1 is generated. Offset represents the location where the first message is written by the master copy of the version of the epoch. For example, <0,0> indicates that the first master copy writes messages from shift 0. <1,100> indicates that the second master copy has version 1 and writes messages from shift 100. The master copy saves this information in the cache and writes it to the checkpoint file periodically. The following is a brief description of how the Leader epoch works:

  • Each message will contain a 4-byte value of the leader epoch Number
  • Each log directory creates a leader EPOCH Sequence file to hold the master copy version number and start displacement.
  • When a copy becomes the master copy, a new record is added at the end of the leader Epoch sequence file, and each new message becomes the new leader epoch value.
  • After a copy breaks down and restarts, the following operations are performed:
    • Restore all leader epochs from the Leader Epoch sequence file.
    • Send a LeaderEpoch request to the primary copy of the partition containing the latest leader Epoch value from the copy’s leader Epoch sequence file.
    • The master copy returns the lastOffset of the LeaderEpoch from the master copy. The lastOffset returned by the master copy returns the lastOffset of the LeaderEpoch from the master copy. The master copy returns the lastOffset of the LeaderEpoch from the master copy. The other is to return the LEO value of the current master copy if it is equal to the leader epoch in the request.
    • If the shift from the leader epoch of the replica is greater than the lastOffset returned from the leader, then the leader epoch sequence value of the slave replica is kept consistent with that of the master replica.
    • The offset where the local message is truncated from the replica to the LastOffset returned by the master replica.
    • Starting from replica Pull data from the master replica.
    • When obtaining data, if the leader epoch value in the message is found from the replica to be larger than its latest leader EPOCH value, the leader EPOCH value is written to the leader Epoch sequence file, and then the file synchronization continues.

How does the Leader epoch mechanism avoid the two exception scenarios mentioned earlier

3.5.1 Data Loss Scenario

  • As shown in the figure, secondary replica B is restarted and sends the data to primary replica AoffsetsForLeaderEpochRequest, if the primary and secondary copies of the epoch are equal, then A returns the current LEO=2, and there is no shift greater than 2 from copy B, so truncation is not required.
  • When slave copy B sends A fetchoffset=2 request to master copy A, A is down. Therefore, slave copy B becomes master copy and updates the EPOCH value to

    and HW value to 2.
    =1,>
  • When A recovers and becomes the slave copy, it sends A fetcheOffset=2 request to B, and B returns HW=2, then it updates HW=2 from replica A.
  • Master copy B receives write requests, and secondary copy A sends data synchronization requests to master copy A.

Can be seen from the introduction of leader epoch value after avoids the aforementioned data loss situation, but it’s important to note here is that if the first step in the above, from A copy of the B to the master copy after A send offsetsForLeaderEpochRequest attempt failed, namely the master copy A at the same time, it goes down, Then message 1 is lost, as mentioned in the data inconsistency scenario below.

3.5.2 Data Inconsistency Scenario Resolved

  • After recovering from replica B, the data is sent to master replica AoffsetsForLeaderEpochRequestRequest, because the master replica is also down, replica B will become the master replica and truncate message 1 while receiving a new write to message 1.
  • After replica A recovers, it becomes the slave replica and is sent to master replica AoffsetsForLeaderEpochRequestRequest, the requested epoch value is smaller than that of master copy B, so master copy B returns the start shift with epoch=1, that is, lastoffset=1, so that message 1 is truncated from replica A.
  • Copy A pulls the message from copy B and updates the epoch value < EPOCH =1, offset=1>.

It can be seen that the introduction of epoch avoids data inconsistency. However, if both replicas fail, data loss still exists. All the previous discussions are based on the premise of min.insync.replicas=1, so the reliability and speed of data need to be tradeoff.

4. Producers

4.1 Message partition selection

The producer’s main role is to produce messages and store them in Kafka’s corresponding topic partition. The following three policies determine which partition a message should be stored in (descending order from top to bottom) :

  • If the partition to which the message belongs is specified when the message is sent, it is sent directly to the specified partition.
  • If no message partition is specified, but message partition is setkey, according to thekeyHash value of the partition selection.
  • If neither of the preceding two conditions is met, the partition is selected in polling mode.

4.2 ACK Parameter Settings and Meanings

When the production end sends messages to the Kafka cluster, you can set the reliability level by using the request.required.acks parameter

  • 1: The default value is 1, indicating that the leader replica in the ISR successfully receives data and confirms it before sending the next message. If the primary node breaks down, data may be lost. For details, see the previous section on replica.
  • 0: indicates that the production end can send the next batch of data without waiting for the node’s confirmation. In this case, the data transmission efficiency is the highest, but the data reliability is the lowest.
  • – 1: indicates that a message is written successfully at the production end after all replica nodes in the ISR have received data. This indicates that the production end has the highest reliability but the lowest performancemin.insync.replicasA value of 1 will allow only one copy of the ISR set in this case, so there will also be data loss.

4.3 Idempotent properties

So-called idempotence refers to one or more requests a resource for resource itself should have the same result (with the exception of network timeout problems), and the understanding of popular point is the same operation or the impact of any perform multiple effect is the same as a performing impact, idempotent is the key to the server can identify whether the request repeat, These repeated requests are then filtered out, and the following information is usually required to implement idempotent properties:

  • Unique identifier: To determine whether a request is duplicate, there needs to be a unique identifier, and then the server can determine whether the request is duplicate based on this unique identifier.
  • Record requests that have been processed: The server needs to record requests that have been processed, and then determine whether the request is repeated according to the unique identifier. If the request has been processed, the server rejects the request directly or returns success without any operation.

In Kafka, the idempotentiality of a Producer is that a message is persisted only once in the cluster when it is sent. The idempotentiality of a Producer is only true when:

  • Only the idempotent of the production end in a single session can be guaranteed. If the production end fails due to some reason and then restarts, the idempotent cannot be guaranteed because the previous state information cannot be obtained. That is, the idempotent cannot be achieved at the session level.
  • Idempotentiality does not exceed multiple topic partitions, but only ensures idempotentiality within a single partition. When multiple message partitions are involved, the intermediate state is not synchronized.

If you want to support a broken session or a broken multiple message partitions, you need to use the transactional nature of Kafka.

In order to realize idempotent semantics of the generation end, the concepts of Producer ID (PID) and Sequence Number are introduced:

  • Producer ID (PID) : Each Producer is assigned a unique PID during initialization. The allocation of the PID is transparent to users.
  • Sequence Number: For a given PID, the Sequence Number monotonically increases from 0. Each topic partition generates an independent Sequence Number, which is added to each message by the producer when sending a message. The broker caches the serial number of the committed message. Only messages that are 1 greater than the serial number of the last committed message in the cache partition will be accepted. Others will be rejected.

4.3.1 Message Sending Process at the Production End

The following is a brief description of the idempotent message sender workflow

  1. The production end uses Kafkaproducer to add data to the RecordAccumulator. When data is added, it determines whether to create a ProducerBatch.
  2. Background start sending thread production end, will determine whether the current PID need to reset, reset because certain messages partition batch try again many times still failed the final has been removed because of a timeout, the serial number cannot be continuous, lead to subsequent messages cannot be sent, so will reset the PID, and the related cache information to empty, this time the message will be lost.
  3. The sending thread determines whether a new PID request is required, and if so, blocks until the PID information is obtained.
  4. The sending thread is callingsendProducerData()Method, the following judgments are made:
    • Check whether the subject partition can continue sending messages, whether the PID is valid, and if batch is retry, check whether the previous batch is sent. If the batch is not sent, the message sending of the current subject partition is skipped until the previous batch is sent.
    • If ProducerBatch is not assigned with the corresponding PID and serial number information, it will be set here.

4.3.2 Process of Receiving messages on the Server

After receiving a write request from the production end, the broker makes some decisions to determine whether the data can be written. This section describes the operation process related to idempotent.

  1. If the request is idempotent, the ClusterResource is checked for IdempotentWrite permission, and if not, an error is returnedCLUSTER_AUTHORIZATION_FAILED.
  2. Check whether PID information exists.
  3. According to the batch serial number, check whether the batch is repeated. The server cache the latest five batches of information corresponding to each PID partition. If the batch information is repeated, the server returns a success message, but does not perform the actual data write operation.
  4. If the PID is available and the batch is not repeated, perform the following operations:
    • Check whether the PID is already in the cache.
    • If no, check whether the sequence number starts from 0. If yes, it is a new PID. Record the PID information (including PID, epoch, and sequence number information) in the cache, and then write data. If it does not exist but the sequence number does not start from 0, an error is returned indicating that the PID has expired on the server or that the data written by the PID has expired.
    • If the PID exists, the system checks whether the PID epoch version is consistent with that of the server. If the PID epoch version is inconsistent with that of the server and the sequence number does not start from 0, an error is returned. If the epoch is inconsistent but the sequence number starts from 0, it can be written normally.
    • If the version of the EPOCH is consistent, the system queries whether the latest sequence number in the cache is consecutive. If the sequence number is not consecutive, an error is returned. Otherwise, the system writes the epoch normally.

5. Consumers

Consumers mainly pull messages from the Kafka cluster, and then related to the consumption logic, consumer consumption progress by its own control, increasing the consumption of flexibility, such as the consumer can control the repeated consumption of certain messages or skip some messages to consume.

5.1 consumer groups

Multiple consumers can form a consumer group, and each consumer belongs to only one consumer group. Each partition of a consumer group subscription topic is assigned to only one consumer in that consumer group, and the different consumer groups are isolated from each other without dependencies. The same message can only be consumed by one consumer in the consumer group. If the same message is to be consumed by multiple consumers, each consumer needs to belong to different consumer groups, and there is only one consumer in the corresponding consumer group. The introduction of consumer group can achieve the “exclusive” or “broadcast” effect of consumption.

  • A consumer group can have multiple consumers, and the number of consumers can be dynamically changed.
  • Each partition under the consumer group subscription topic is assigned to only one consumer in the consumer group.
  • Id Indicates a consumer group. If the group is the same, the group belongs to the same consumer group.
  • Different consumer groups are isolated from each other.

As shown in the figure, consumer group 1 consists of two consumers, where consumer 1 is assigned to consumption zone 0 and consumer 2 is assigned to consumption zone 1 and 2. In addition, the introduction of consumer group also supports the horizontal expansion and failover of consumers. For example, it can be seen from the figure above that the consumption capacity of consumer 2 is insufficient and the consumption progress is relatively backward compared with that of consumer 1. We can add a consumer to the consumer group to improve its overall consumption capacity, as shown in the figure below.

Suppose consumer 1’s machine goes down, and the consumer group sends a rebalance, assuming that partition 0 is assigned to Consumer 2 for consumption, as shown in the figure below. The number of consumers in the same consumer group should not exceed the number of partitions corresponding to the topic. If the number exceeds the number of partitions, the situation will appear that the excess consumers will not be allocated to the partition, because once allocated to the consumers, the partition will not change, unless the number of consumers in the group changes and rebalancing occurs.

5.2 Consumption displacement

5.2.1 Consumption shift topic

Kafka 0.9 starts to store consumer end shift information in the cluster’s internal theme (__consumer_offsets), which defaults to 50 partitions, with each log entry in the format:

, the key of which is the subject partition and stores the information about the subject, partition, and consumer group, and the value of which is the OffsetAndMetadata object, including the displacement, the submission time of the displacement, and user-defined metadata. Data is written to the topic only when the consumer group commits the shift to Kafka. If the consumer side stores the shift information in external storage, there will be no consumption shift information. The following can be seen through the kafka-console-consumer.sh script to view the topic consumption shift information.
,>

# bin/kafka-console-consumer.sh --topic __consumer_offsets --bootstrap-server localhost:9092 --formatter "kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" --consumer.config config/consumer.properties --from-beginning

[consumer-group01,nginx_access_log,2]::OffsetAndMetadata(offset=17104625, leaderEpoch=Optional.[0], metadata=, commitTimestamp=1573475863555, expireTimestamp=None)
[consumer-group01,nginx_access_log,1]::OffsetAndMetadata(offset=17103024, leaderEpoch=Optional.[0], metadata=, commitTimestamp=1573475863555, expireTimestamp=None)
[consumer-group01,nginx_access_log,0]::OffsetAndMetadata(offset=17107771, leaderEpoch=Optional.[0], metadata=, commitTimestamp=1573475863555, expireTimestamp=None)
Copy the code

5.2.2 Automatic submission of consumption displacement

On the consumer end, you can set enable.auto.com MIT to control whether the submission is automatic or manual. If the value is true, it indicates that the submission is automatic. In the background of the consumer end, the consumption shift information is submitted periodically at an interval of 5 seconds (default interval: auto.commit.interval.ms).

However, the following problems exist if automatic pickup is set:

  1. There may be repeated shift data commits to the consumption shift topic because a message is written to the topic every 5 seconds, regardless of whether there is a new consumption record, resulting in a large number of same key messages when only one is needed, so you need to rely on the log compression strategy mentioned earlier to clean up the data.
  2. If the rebalance occurs within five seconds, all consumers will start to consume from the shift they submitted last time. Then the data consumed during the rebalance will be consumed again.

5.2.3 Manual submission of consumption displacement

For manual submission, the enable.auto.com MIT value must be set to false, and then the service consumer controls the consumption progress. Manual submission can be divided into the following three types:

  • Synchronous manual commit displacement: if the synchronous commit method is calledcommitSync(), the poll pulls the latest shift to the Kafka cluster and waits until the commit is successful.
  • Asynchronous manual commit shift: Calls the asynchronous commit methodcommitAsync()Is returned immediately after the method is called, without blocking, and the associated exception handling logic can then be performed through the callback function.
  • Specify the commit displacement: the commit displacement is also divided into asynchronous and synchronous. The pass parameter is Map

    , where key is the message partition and value is the displacement object.
    ,>

5.3 Group Coordinator

A Group Coordinator is a service that is started by each node ina Kafka cluster to store metadata related to a consumer Group. Each consumer Group selects a Coordinator to store the consumption shift information of each partition within the Group. The main steps of selection are as follows:

  • It is preferred to determine which partition to store the displacement information of the consumer group: the default __consumer_offsets topic number is 50, and the following algorithm can be used to calculate which partition to store the displacement information of the corresponding consumer grouppartition = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount)Among themgroupIdIs the id of the consumer group specified by the consumer,groupMetadataTopicPartitionCountIs the number of partitions for the topic.
  • The node broker corresponding to the leader of the partition is found according to the partition. The Coordinator of the broker is the Coordinator of the consumer group.

5.4 Rebalance mechanism

5.4.1 Rebalancing Scenario

The following scenarios trigger a rebalance operation:

  1. New consumers join the consumer group.
  2. Consumers go offline passively. For example, if a consumer fails to send a heartbeat request to a Group Coordinator due to a long GC period or network delay, the consumer is considered offline and kicked out.
  3. Consumers voluntarily quit the consumer group.
  4. The number of any topic partitions subscribed to by the consumer group changed.
  5. A consumer unsubscribing to a topic.

5.4.2 Rebalancing Process

Rebalancing can be achieved in the following stages:

  1. To find theGroup Coordinator: The consumer selects the node with the smallest load in the Kafka cluster to sendGroupCoorinatorRequestRequest and process the return responseGroupCoordinatorResponse. The request parameter contains the ID of the consumer group. The response parameter contains the ID of the node where the Coordinator resides, host, and port number.
  2. Join group: When the consumer gets the coordinator’s information, it will send a request to the coordinator to join the consumer groupJoinGroupRequestWhen all consumers have sent the request, the coordinator will select one consumer as the leader role and then send the group membership information, subscription, and other information to the consumer (response format)JoinGroupResponseSee table below), the leader is responsible for the allocation of consumption schemes.

JoinGroupRequest Request data format

The name of the type instructions
group_id String Consumer id
seesion_timeout int If the coordinator does not receive a heartbeat message within the period specified by session_TIMEOUT, the consumer is considered offline
member_id String The ID assigned to the consumer by the coordinator
protocol_type String Protocol implemented by the consumer group. The default issonsumer
group_protocols List Contains all PartitionAssignor types supported by this consumer
protocol_name String PartitionAssignor type
protocol_metadata byte[] Serialized consumer subscription information for different PartitionAssignor types, including user-defined data userData

JoinGroupResponse Format of response data

The name of the type instructions
error_code short Error code
generation_id int Chronological information assigned by the coordinator
group_protocol String The PartitionAssignor type chosen by the coordinator
leader_id String Leader of the member_id
member_id String The ID assigned to the consumer by the coordinator
members Map collections All consumer subscriptions in a consumer group
member_metadata byte[] Subscription information for consumers
  1. Synchronizing Group StatePhase: Sent when the leader consumer has completed the allocation of the consumption planSyncGroupRequestThe request is sent to the coordinator, and other non-leader nodes also send the request, except that the request parameter is null, and the coordinator responds by assigning the resultSyncGroupResponseTo each consumer, the request and the corresponding data format are shown in the following table:

SyncGroupRequest Request data format

The name of the type instructions
group_id String Id of a consumer group
generation_id int Age information saved by a consumer group
member_id String The consumer ID assigned by the coordinator
member_assignment byte[] Partition allocation result

SyncGroupResponse Response data format

The name of the type instructions
error_code short Error code
member_assignment byte[] The partition assigned to the current consumer

5.4.3 Partition allocation Policy

Kafka provides three partition allocation strategies: RangeAssignor, RoundRobinAssignor, and StickyAssignor. The implementation of each algorithm is briefly described below.

  1. RangeAssignor: By default, Kafka uses this policy to assign partitions. The process is as follows

    • Sorted partitions under all subscription topics to get the collectionTP={TP0,Tp1,... ,TPN+1}.
    • All consumers in the consumer group are lexicographically sorted by name to get the collectionCG={C0,C1,... ,CM+1}.
    • To calculateD=N/M.R=N%M.
    • Ci =D* I +min(I,R), Ci =D+(if (I +1>R)0 else 1)

    Suppose that there are two consumers {C0,C1} in a consumer group, and the consumer group subscries to three topics {T1,T2,T3}, each topic has three partitions, a total of nine partitions {TP1,TP2… TP9}. Through the above algorithm, we can get D=4, R=1, then the consumption group C0 will consume the partition of {TP1,TP2,TP3,TP4,TP5}, and C1 will consume the partition of {TP6,TP7,TP8,TP9}. The problem is that if the partition is not evenly divided, the first few consumers will consume one more partition.

  2. RoundRobinAssignor: To use this policy, the following two conditions must be met: 1) All consumers in the consumer group should subscribe to the same topic; 2) All consumers in the same consumer group specify the same number of streams for each topic when instantiating.

    • Sort all partitions of all topics according to hash values obtained by topic + partition.
    • Sort all consumers lexicographically.
    • Partitions are allocated to consumers through polling.
  3. StickyAssignor: This assignment method was introduced in version 0.11 to ensure the following features: 1) To ensure that the allocation is as balanced as possible; 2) When reassigning, retain as much of the existing allocation as possible. The first one has a higher priority than the second.

6. Summary

In this article, we around the characteristics of Kafka, introduced in detail the principle of the implementation, through the in-depth analysis of the topic and log, understand the Kafka internal message storage, retrieval and deletion mechanism. The introduction of ISR in duplicate system solves the defects of synchronous replication and asynchronous replication, and the emergence of lead epoch mechanism solves the problems of data loss and data inconsistency. The partition selection algorithm of the production end realizes the data balance, and the idempotent feature support solves the problem of repeated messages.

Finally, the principles of the consumer end are introduced. The consumer group mechanism realizes the message isolation of the consumer end, which supports both broadcast and exclusive scenarios, and the rebalancing mechanism ensures the robustness and scalability of the consumer end.

reference

[1] Xu Junming. Apach Kafka source analysis [M]. Beijing. Publishing House of Electronics Industry,2017.

[2] Kafka deep parsing.

[3] Understand distributed message queues based on Kafka and ZooKeeper.

[4] Kafka transactional idempotence implementation.

[5] Kafka High Watermark and leader epoch.

[6] How kafka consumers allocate partitions.