First, the producer principle

1. Producer sends messages

To learn and understand the message sending process of Producer, let’s first look at the following figure

Next, we analyze the message sending process of Producer according to the steps in the figure.

1.1. Create KafkaProducer

Prouducer runs primarily in coordination with two threads. One is the main thread; One is the sender thread.

 KafkaProducer<String,String> producer = new KafkaProducer<String,String>(pros);
Copy the code

KafkaProducer creates a sender and starts an I/O thread. Kafka-clients-2.6.0.jar

1.2 interceptor ProducerIntercepetor

Interceptors are performed for producer calls the send method, view the kafka – clients – server. Jar source code, we in KafkaProducer. See the following code in Java

The role of the interceptor here is similar to the use of AOP, before and after the operation of sending a message.

Next, let’s look at ProducerIntercepetor use

  1. implementationorg.apache.kafka.clients.producer.ProducerInterceptorInterface, and implement methods.
  2. Configure it when usedinterceptor.classesParameters.

Using the instance

Creating interceptors

public class TestInterceptor implements ProducerInterceptor<String.String> {
    // Before sending a message
    @Override
    public ProducerRecord<String, String> onSend(ProducerRecord<String, String> record) {
        System.out.println("Message about to be sent !!!!");
        return record;
    }

    // Triggered when an ACK is received from the server
    @Override
    public void onAcknowledgement(RecordMetadata metadata, Exception exception) {
        System.out.println("Message received by server");
    }
	
    // The producer turns off the trigger
    @Override
    public void close(a) {
        System.out.println("The producer is closed.");
    }

    // When configured with key-value pairs
    @Override
    public void configure(Map
       
         configs)
       ,> {
        System.out.println("configure..."); }}Copy the code

Is configured when called

Properties props=new Properties();
        props.put("bootstrap.servers"."127.0.0.1:9092");
        props.put("key.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer"."org.apache.kafka.common.serialization.StringSerializer");
        props.put("acks"."1");

        // Add interceptor
        List<String> interceptors = new ArrayList<>();
        interceptors.add("com.testkafka.interceptor.TestInterceptor");
        // props.put(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, interceptors);
		props.put("interceptor.classes", interceptors);
        Producer<String,String> producer = new KafkaProducer<String,String>(props);
        producer.send(new ProducerRecord<String,String>("mytopic"."1"."1"));
Copy the code

1.3 Serializer

After the interceptor is configured and used, the serialization of keys and values begins. Kafkaproducer.java: kafkaproducer.java: kafkaproducer.java: kafkaproducer.java: kafkaproducer.java: kafkaproducer.java: kafkaproducer.java: kafkaproducer.java: kafkaproducer.java: kafkaproducer.java

Kafka provides a serialization tool for response for serialization of different data.

ByteArraySerializer
ByteBufferSerializer
BytesSerializer
DoubleSerializer
FloatSerializer
IntegerSerializer
LongSerializer
ShortSerializer
StringSerializer
UUIDSerializer
VoidSerializer
Copy the code

In addition to the Serializer, you can use a Serializer of a customized type, such as Avro, JSON, Thrift, and Protobuf. The Serializer interface is required.

Refer to the connection for the code

Gitee.com/fanger8848/…

1.4 route Specification (partitioner)

Kafka-clients-2.6.0.jar (kafkaproducer.java

The partition () method here returns the partition number, starting at 0.

Which partition will a message be sent to?

  1. Specifies the partition;
  2. The partition is not specified, but the partition is customized.
  3. No partition is specified, no custom partition is created, but the key is not empty.
  4. No partition is specified, no custom partition is specified, but the key is empty.

Now, let’s do one analysis at a time.

** First case: ** specifies the partition, so it will be sent to the specified partition.

Code sample

The second case is that the partition is not specified and the partition is customized. At this time through the custom partition, partition number.

Need to implement custom partition device org. Apache. Kafka. Clients. Producer. The Partitioner class, pay equal attention to write the partition () method.

And Producer needs to configure the partitioner path

props.put("partitioner.class"."com.fanger.partition.SimplePartitioner");
Copy the code

Code sample

In the third case, no partition is specified and no custom partition is defined, but the key is not empty.

If no partition value is specified but a key exists, the DefaultPartitioner uses the hash value of the key to mod the number of partitions in the topic to obtain the partition value.

Fourth case: No partition is specified, no custom partition is created, but the key is empty.

In the case that there is neither a partition value nor a key value, an integer is randomly generated during the first call (the integer is incremented in each subsequent call) and the partition value is obtained by modulating this value with the total number of partitions available for topic, which is also known as the round-robin algorithm.

1.5. Message accumulator

By looking at the source code, we can see that the message was not sent immediately after the partition was specified. It’s called into the RecordAccumulator.

The message accumulator, which is actually ConcurrentMap, maintains the relationship between TopicPartition and Deque.

  private final ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches;
Copy the code

When the ProducerBatch of a partition is full or the time is up, it wakes up the Sender thread and sends a message.

2. The server responds to ACK

Producer can configure acks to set the reply policy of the server.

  1. acks=0The Broker returns the message received (it has not yet been written to disk). The delay is minimal, and the risk of losing data is high if the Broker fails.
  2. acks=1An ACK is returned after the leader of the Partition successfully drops the disk. If the follower fails before the synchronization is successful, the leader fails and data is lost.
  3. acks=-1The ACK is returned only after all the leader and followers of the Partition fall down.

The Partition leader and follower are involved in the storage of the Partition.

Second, the storage principle of Broker

1. File storage structure

1.1. Broker storage model analysis

How can a Topic be stored among multiple brokers? From the figure above, we can see that Topic 1 is divided into three partitions, namely P1, P2 and P3.

In addition, two copies are set for each Partition.

In the figure above, P1 with red background is the leader partition of Partition1, and P1 with blue background is the follower partition. The consumer reads the data only through the leader node, thus avoiding the data consistency issues associated with master-slave replication.

1.2. Distribution of Replica copies

1.2.1. How do I view the copy distribution

Run the following command to view partition copies:

Sh kafka - switchable viewer. Sh - topic 3 part3rep - 1 - go - zookeeper 192.168.0.101:2181192168 0.102:2181192168 0.103:2181;Copy the code

View the result as shown below:

1.2.2 Copy Distribution rules

  1. First, replica factors cannot be greater than the number of brokers.
  2. The first copy placement of the first partition (the partition numbered 0) is randomly selected from the collection of brokers;
  3. The first copy of the other partitions is moved backward relative to the 0th partition; For example, if we have five brokers with five partitions, assuming the first copy of the first partition is placed on the fourth Broker, the first copy of the second partition will be placed on the fifth Broker. The first copy of the third partition will be placed on the first Broker; The first copy of the fourth partition will be placed on the second Broker, and so on;
  4. The remaining copies of each partition are placed relative to the first copynextReplicaShiftAnd this number is also random.

Typically, the first copy of the first partition is the leader of that partition’s copy.

1.3. The segment analysis

1.3.1 Segment File structure analysis

A partition is divided into multiple segments to store data in order to prevent the file from becoming too large due to the continuous addition of logs, which leads to low message retrieval efficiency.

Each partition contains a set of files as shown in the following figure:

  • .logA file is a file that stores messages.
  • .indexIs the offset index file.
  • .timeindexIs a timestamp index file.

The. Log file splits logs to generate a new set of segment files when certain conditions are met during the process of continuously adding data to messages. The conditions for shard logs are as follows:

In the first case, you can control the size of the. Log file based on the following parameters. The default value is 1073741824 byte (1G).

log.segment.bytes=1073741824
Copy the code

The second case is based on the maximum timestamp of the message and the difference between the current system timestamp. It can be controlled by the following parameters: The default value is 168 hours (one week)

log.roll.hours=168
It can be controlled in milliseconds
log.roll.ms
Copy the code

The third case is that the offset index file or timestamp index file reaches a certain size. The default value is 10485760 bytes (10M). If you want to reduce the shard of log files, you can increase this value a bit.

log.index.size.max.bytes=10485760
Copy the code
1.3.2 Offset Index file

The offset index file records the mapping between offset and the physical address (location in the log file) of the message. The content is a secondary file. You can run the following command to view the information

./kafka-dump-log.sh --files /tmp/kafka-logs/topic1-0/00000000000000000000.index|head -n 10
Copy the code

The query content is as follows:

Note that Kafka’s index is not a sparse index for each message. The sparse index structure is shown in the figure below

How to determine the sparsity of a sparse index?

The sparsity of the offset index is determined by the size of the interval message, which defaults to 4KB and can be controlled by the following parameters

log.index.interval.bytes=4096
Copy the code

As long as the written message exceeds 4K, one record is added to the offset index.

1.3.3. Timestamp index file

There are two types of timestamp indexes: the timestamp of message creation and the time consumed for appending writes to the Broker. When exactly? Controlled by a single parameter:

log.message.timestamp.type=CreateTime or LogAppendTime
CreateTime (CreateTime); LogAppendTime (LogAppendTime)
Copy the code

To view the earliest 10 timestamp indexes, run the following command:

./kafka-dump-log.sh --files /tmp/kafka-logs/topic1-0/00000000000000000000.timeindex|head -n 10
Copy the code

The result is as follows, where the mapping between time and offset is recorded

1.3.4. How to quickly retrieve messages?

Let’s say I want to retrieve a message with an offset of 10002673.

  1. When consuming, you can determine the partition, so the first step is to find which segment to be in. The Segment file is named with base offset, so it can be quickly determined using dichotomy (find segments with names not less than 10002673).
  2. This segment has corresponding index files that appear in sets. So now we look for position in the index file based on offset.
  3. After obtaining position, search for offset in the corresponding log file and compare with the offset of the message until the message is found

1.4,

2. Message retention (cleanup) mechanism

2.1. Switches and Policies

You can set the following parameters to enable the message clearing mechanism

log.cleaner.enbable=true
Copy the code

Two cleanup strategies are provided in Kafka:

  • Delete delete
  • Compact for log compression

The default value is directly deleted and can be configured

log.cleanup.policy=delete
Copy the code

2.2. Delete policy directly

The log. The retention. Check. Interval. Ms, this parameter is used to set the time interval of each log delete, default is 5 minutes.

Log.retention. Hours: this parameter is set to a time (168 hours by default). Messages that are saved for longer than this time are defined as messages that need to be deleted. It can also be set in minutes or milliseconds. The arguments are log.retention. Minutes (default null) and log.retention.

Log.retention. Bytes. This parameter sets a log file size (the size of all segment files) that exceeds the log file size threshold. The default value is -1, which does not limit the file size.

2.3. Compact

Compression is the merging of the same key into the last value. As shown in the figure below

3. High availability principle

3.1. Controller election

First, what does a Controller do? A Controller is a node elected from a Broker. Its main tasks are as follows

  • Listen for Broker changes
  • Monitor Topic changes
  • Listen for Partition changes
  • Retrieves and manages information about brokers, partitions, and topics
  • Manage the primary/secondary information of partitions

How is Controller selected?

All brokers at startup attempt to create temporary nodes/controllers in ZK, only one of which succeeds (first come, first served). If the Controller hangs or there is a network problem, the temporary node on the ZK will disappear. Other brokers listen to the offline Controller via watch and campaign for a new Controller. The method is the same as before, whoever writes a /controller node in ZK first becomes the new controller.

3.2. Leader election of a partition copy

With the Controller’s Broker node, you can elect the leader of the partitioned copy. Here are a few concepts to know

  • Assign-replicas (AR) : Specifies all Replicas of a partition. All the dauphins;

  • In-sync Replicas(ISR) : Is the replica of all the Replicas that maintains some degree of synchronization with the leader data. The dauphin who comes every day for the morning meeting.

  • Out-of-sync-replicas (OSR) : A replica that is too late to be synchronized with the leader. The prince who sleeps late, doesn’t attend morning meetings, doesn’t fall in love with the emperor.

AR = ISR + OSR. Under normal circumstances OSR is empty, everyone is normal synchronization, AR=ISR. If the synchronization delay exceeds 30 seconds, ISR is kicked out and OSR is entered. If you catch up, join the ISR.

How are leader replicas elected?

Here the election is hosted by Controller, using Microsoft’s PacificA algorithm. In this algorithm, by default, the first replica in the ISR becomes the leader. For example, ISR is 1, 5, and 9.

What are the common election protocols (or consensus algorithms) in distributed systems?

ZAB (ZK), Raft (Redis Sentinel)(both variations of the Paxos algorithm), all boil down to first come, first served, minority rule.

3.3 Partition copy synchronization mechanism

3.3.1 Copy synchronization mechanism

After the leader copy is successfully elected, data needs to be synchronized to the backup copy. How do followers want the leader to synchronize data?

First I need to learn a few concepts, as shown below

  • Hight Watermark: indicates the Offset of the last Committed message in the partition.
  • LEO: Log End Offset, the Offset of the latest message in the Leader.
  • Committed Message: messages that have been Committed and synchronized by all ISRS.
  • Lagging messages: Messages that do not achieve all ISR synchronization.

What does data synchronization look like?

  1. The follower node sends a fetch request to the Leader. After the Leader sends data to the follower’, the follower’s LEO needs to be updated.
  2. After receiving the data response, the followers write messages and update the LEO in turn.
  3. The leader updates the HW (LEO with the smallest ISR).

Note that consumers can only consume data in HW. Kafka designs unique ISR replication to provide high throughput while maintaining data consistency.

3.3.2 Follower failure

If the follower fails, the following steps are performed

  1. Follower is proposed for ISR.
  2. After the followers reply, they delete the data whose HW is higher than the previously recorded HW.
  3. Then the leader data is synchronized until the leader catches up with the ISR again.
3.3.3. The leader is faulty

If the leader fails, perform the following steps

  1. The leader is raised from the ISR, and the Controller elects a new leader.
  2. Other followers delete messages higher than HW and then synchronize the leader’s data.

Note that this mechanism only guarantees data consistency, not data loss or duplication.

Three, consumer principle

1. Maintain Offset

The consumer’s consumption information is stored in the Topic of **_consumer_offset**. Two main objects are stored:

  • GroupMetadata: Stores information about each consumer in a consumer group (each consumer is numbered).
  • OffsetAndMetadata: Stores the offset information metadata of consumer groups and partitions.

What if the consumer can’t find offset after joining the consumer group?

In the consumer configuration, there is auto-.offset. reset, which can have the following options

  • Latest is consumed from the most recent message (the last sent). Historical consumption cannot be consumed.
  • “Verdi” is made from the earliest (or earliest sent) message. You can consume historical messages.
  • None, an error is reported if the Consumer Group cannot find offset on the server.

When is the data in _consumer_offset updated?

Updating the consumer_offset will not happen until the consumer commits. You can configure the following parameters to select manual or automatic updates

# Default is true. True means that the Broker will update the consumer group's offset when the message is automatically submitted after the consumer consumes it.
enable.auto.commit 
You can use this parameter to control the frequency of submissions. The default is 5 seconds
auto.commit.interval.ms
Copy the code

If manual submission is selected, the following methods are available

// Method 1: Manually synchronize the submission
consumer.commitSync();

// Mode 2: Manual asynchronous submission
consumer.commitAsync();
Copy the code

If a commit is not made or fails, the Broker’s offset is not updated and the consumer group will consume a duplicate message the next time it consumes it.

2. Consumption Strategy (relationship between consumers and regions)

As you can see from the figure above, Kafka provides three ways of dealing with consumers and partitions:

  • RangeAssignor (default policy) : Allocate sequentially by scope.
  • RoundRobinAssignor: Round robin assignment.
  • StickyAssignor: This strategy is a little bit more complicated, but relatively uniform (the result may be different every time). Principle :1) The distribution of partitions should be as uniform as possible; 2) Partitions are allocated as much as possible as they were last allocated.

To select a policy, you can configure the following parameters on the consumer:

prop.put("partition.assignment.strategy"."org.apache.kafka.clients.consumer.RoundRobinAssignor");
Copy the code

3. Partition redistribution principle

There are two situations where you need to reassign the relationship between partitions and consumers:

  • The number of consumers in the consumer group changes, for example, new consumers are added;

  • The number of partitions for the Topic changed, was added or decreased.

To make partition allocations as balanced as possible, this triggers the rebalance mechanism.

  1. Find a person who acts as a watchdog and ensures fairness. Each Broker has an instance called a GroupCoordinator that manages offsets and consumer groups. The first step is to find a coordinator from all the Groupcoordinators.
  2. Step two, make a head count. A join group request is made when a number of consumers are connected to a GroupCoordinator.
  3. The third step is to select a leader. The GroupCoordinator selects a leader from all consumers. This consumer will determine a plan according to the consumer’s situation and set of strategies. The Leader reports the solution to the GroupCoordinator, and the GroupCoordinator notifies all consumers.

Why is Kafka so fast?

1. The reasons are summarized as follows:

  1. Sequential disk write (data)
  2. Zero copy (read data)
  3. File index (segment.index,.timeindex)
  4. Messages are read and written in batches and compressed to reduce network I/O loss.

Configuration to ensure that Kafka messages are not lost

  1. Producer end useproducer.send(msg, callback)Send with a callback instead of the producer.send(MSG) method. Based on the callback, once a message delivery failure occurs, it can be handled in a targeted manner.
  2. Set up theacks = all. Acks is a parameter of Producer that represents the definition of a “committed” message. If set to all, it means that all brokers must receive the message before it is considered “committed”.
  3. Set up theretriesIs a larger value. This is also the argument for Producer. When network jitter occurs, messages may fail to be sent. In this case, a Producer configured with REtries can automatically retry sending messages to avoid message loss.
  4. Set up theunclean.leader.election.enable= false.
  5. Set up thereplication.factor > = 3. More than three copies are required.
  6. Set up the min.insync.replicas > 1. Broker side parameter that controls how many copies a message must be written to before it is considered “committed”. Setting it to greater than 1 improves message persistence. Do not use the default value 1 in a production environment. Make sure thatreplication.factor > min.insync.replicas. If the two are equal, then if only one copy is offline, the entire partition will not work. Recommended setting toreplication.factor=min.insync.replicas + 1.
  7. Ensure message consumption completes before submission. The Consumer side has a parameterenable.auto.commit, better set it to false and handle the offset commit yourself.