Copy, ISR, consistency
In Kafka, when we create a topic, the replication-factor parameter specifies the number of replicas.
Create a topic: kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181/kafka --create --topic test007 --partitions 2 --replication-factor 3Copy the code
After the creation is complete, you can view the patition, Replicas, and ISR information of the topic using kafka-topics.
kafka-topics.sh --zookeeper node01:2181,node02:2181,node03:2181/kafka --describe --topic test007
Topic:test007 PartitionCount:2 ReplicationFactor:3 Configs:
Topic: test007 Partition: 0 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: test007 Partition: 1 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Copy the code
Leader: 2, replicas: 2,1,0, isr: 2,1,0 with replicas will be designed to synchronize data between the leader and replicas to ensure data consistency between the master replicas. Consistency usually comes in two ways:
- Strong consistency, which means that all replica nodes must be available, undermines availability
- Final consistency, more than half pass, is the most common distributed consistency solution
In Kafka, the replica data synchronization policy selects the second option (ack only after all synchronization is complete) for the following reasons:
- In order to tolerate the failure of N nodes, the first solution requires 2n+1 copies, while the second solution requires only N +1 copies. Each partition in Kafka has a large amount of data, and the first solution creates a large amount of data redundancy.
- Although the network delay of the second scheme is higher, the network delay has less impact on Kafka.
In Kafka, the Leader maintains a dynamic in-sync Replica set (ISR-synchronized replica list), which is a collection of followers that are in sync with the Leader. Kafka ACK/ISR/HW kafka ACK/ISR/HW
The index
After creating a topic, kafka’s log.dir directory will generate the corresponding data directory. Test007, Partition0, Partition1, Partition0, Partition1, Partition0, Partition1 For example, go to the test007-0 directory. The files in the directory are as follows:
Among them, 00000000000000000000. The log file for the data, 00000000000000000000. The index and 00000000000000000000. The timeindex for the index file.
You can run the kafka-dump-log.sh command provided by Kafka to view file contents.
kafka-dump-log.sh --files 00000000000000000000.log
Dumping 00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 0 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1625896634163 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 1367333611
baseOffset: 1 lastOffset: 1 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 81 CreateTime: 1625896634401 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 593410223
baseOffset: 2 lastOffset: 2 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 162 CreateTime: 1625896634433 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 1448478144
baseOffset: 3 lastOffset: 3 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 243 CreateTime: 1625896634465 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 3001936851
baseOffset: 4 lastOffset: 4 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 324 CreateTime: 1625896634483 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 2645175270
baseOffset: 5 lastOffset: 5 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 405 CreateTime: 1625896634522 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 4146059855
baseOffset: 6 lastOffset: 6 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 486 CreateTime: 1625896634550 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 3625250854
baseOffset: 7 lastOffset: 7 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 567 CreateTime: 1625896634570 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 2732649947
baseOffset: 8 lastOffset: 8 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 648 CreateTime: 1625896634579 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 1803337061
baseOffset: 9 lastOffset: 9 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 729 CreateTime: 1625896634594 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 54002002
baseOffset: 10 lastOffset: 10 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 810 CreateTime: 1625896634617 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 839740742
baseOffset: 11 lastOffset: 11 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 891 CreateTime: 1625896634656 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 3114671675
baseOffset: 12 lastOffset: 12 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 972 CreateTime: 1625896634699 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 512924486
baseOffset: 13 lastOffset: 13 count: 1 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 1053 CreateTime: 1625896634733 isvalid: true size: 81 magic: 2 compresscodec: NONE crc: 2870394865
Copy the code
Offset, size, CreateTime, etc.
kafka-dump-log.sh --files 00000000000000000000.index
Dumping 00000000000000000000.index
offset: 51 position: 4131
offset: 102 position: 8262
offset: 153 position: 12393
offset: 204 position: 16524
offset: 255 position: 20655
offset: 306 position: 24786
offset: 357 position: 28917
offset: 408 position: 33048
offset: 459 position: 37179
offset: 510 position: 41310
offset: 561 position: 45441
kafka-dump-log.sh --files 00000000000000000000.timeindex
Dumping 00000000000000000000.timeindex
timestamp: 1625896635654 offset: 51
timestamp: 1625896636752 offset: 102
timestamp: 1625896637830 offset: 153
timestamp: 1625896639150 offset: 204
timestamp: 1625896639729 offset: 255
timestamp: 1625897012620 offset: 306
timestamp: 1625897013413 offset: 357
timestamp: 1625897013954 offset: 408
timestamp: 1625897014458 offset: 459
timestamp: 1625897015002 offset: 510
timestamp: 1625897015462 offset: 561
timestamp: 1625897016150 offset: 612
Copy the code
The index information is shown above.
When the APP calls the corresponding API to obtain data, it always queries and locates through the index index. Given the offset, the corresponding position can be obtained through the offset. After knowing the position, the position to be read can be located through seek. The timeIndex log file is given timestamp to obtain the offset, and the position is obtained from the offset to the index log file.
producer ack
The number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are allowed:
acks=0
If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and theretries
configuration will not take effect (as the client won’t generally know of any failures). The offset given back for each record will always be set to -1.acks=1
This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.acks=all
This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee. This is equivalent to the acks=-1 setting.
The default kafka ACK value is 1, that is, the leader returns an ACK successfully. A more detailed description of ACK is availableKafka producer ACK mechanism analysisConfigure the acks in the producer codeProducerConfig.ACKS_CONFIG
Effect:- pro.setProperty(ProducerConfig.ACKS_CONFIG, “0”);
key:item-1, value:value-0, partition:1, offset:-1 key:item-2, value:value-0, partition:1, offset:-1 key:item-0, value:value-1, partition:0, offset:-1 key:item-1, value:value-1, partition:1, offset:-1 key:item-2, value:value-1, partition:1, offset:-1 key:item-0, value:value-2, partition:0, offset:-1 key:item-1, value:value-2, partition:1, offset:-1 key:item-2, value:value-2, partition:1, offset:-1 key:item-0, value:value-0, partition:0, offset:-1 key:item-1, value:value-0, partition:1, offset:-1 Copy the code
- pro.setProperty(ProducerConfig.ACKS_CONFIG, “-1”);
key:item-0, value:value-1, partition:0, offset:10009 key:item-1, value:value-1, partition:1, offset:20017 key:item-2, value:value-1, partition:1, offset:20018 key:item-0, value:value-2, partition:0, offset:10010 key:item-1, value:value-2, partition:1, offset:20019 key:item-2, value:value-2, partition:1, offset:20020 key:item-0, value:value-0, partition:0, offset:10011 key:item-1, value:value-0, partition:1, offset:20021 key:item-2, value:value-0, partition:1, offset:20022 key:item-0, value:value-1, partition:0, offset:10012 Copy the code
The consumption data offset is located by timestamp
HashMap<TopicPartition, Long> tts = new HashMap<>(); Set<TopicPartition> tps = consumer.assignment(); while (tps.size() == 0) { consumer.poll(Duration.ofMillis(100)); tps = consumer.assignment(); } for (TopicPartition partition : TTS. Put (partition, system.currentTimemillis () -24 * 3600 * 1000); } Map<TopicPartition, OffsetAndTimestamp> forTimes = consumer.offsetsForTimes(tts); for (TopicPartition partition : OffsetAndTimestamp OffsetAndTimestamp = forTimes. Get (partition); // Offset offset OffsetAndTimestamp OffsetAndTimestamp = forTimes. long offset = offsetAndTimestamp.offset(); consumer.seek(partition, offset); }Copy the code