Kafka

Kafka is a distributed message system based on ZooKeeper coordination. Its biggest feature is that it can process a large amount of data in real time to meet various demand scenarios: Such as hadoop-based batch processing system, low latency real-time system, Storm /Spark streaming engine, Web/Nginx logging, access logging, messaging service, etc. Written in Scala, Linkedin was contributed to Apache Foundation in 2010 and became a top open source project.

1. Introduction

The performance of message queue and the design of file storage mechanism is one of the key indicators to measure the technical level of message queue service. The following will analyze how Kafka achieves efficient file storage from the perspective of Kafka file storage mechanism and physical structure, and the practical application effect.

1.1 Features of Kafka:

– High throughput and low latency: Kafka can process hundreds of thousands of messages per second, and its latency is only a few milliseconds. Each topic can be divided into partitions, and the consumer group consumes partitions.

– Scalability: Kafka clusters support hot scaling

– Persistence and reliability: Messages are persisted to local disks, and data backup is supported to prevent data loss

– Fault tolerance: nodes in the cluster are allowed to fail (n-1 nodes are allowed to fail if the number of copies is N)

– High concurrency: Thousands of clients can read and write data simultaneously

1.2 Usage Scenarios of Kafka:

– Log collection: A company can use Kafka to collect logs of various services and open them to consumers, such as Hadoop, Hbase, and Solr, as a unified interface service.

– Message system: decouple and producer and consumer, cache messages, etc.

– User activity tracking: Kafka is often used to record the activities of Web users or app users, such as browsing, searching, clicking, etc. These activities are published by various servers to Kafka topics, which subscribers subscribe to for real-time monitoring and analysis. Or load it into Hadoop or data warehouse for offline analysis and mining.

– Operational metrics: Kafka is also used to record operational monitoring data. This includes collecting data for various distributed applications and producing centralized feedback for various operations, such as alarms and reports.

– Streaming: spark Streaming and Storm

– the event source

1.3 Kakfa’s design philosophy

– Election of Kakfa Broker Leader: Kakfa Broker clusters are managed by Zookeeper. All Kafka Broker nodes register a temporary node on Zookeeper. Only one Kafka Broker will register successfully and all others will fail. So the Kafka Broker that successfully registers temporary nodes on Zookeeper becomes the Kafka Broker Controller, and the other Kafka brokers are called Kafka Broker follower. (This process is called a Controller registering a Watch in ZooKeeper). This Controller listens for all information about other Kafka Brokers. If the Kafka Broker Controller goes down, the temporary node on ZooKeeper disappears. At this point, all Kafka brokers will register a temporary node on Zookeeper. Only one Kafka broker will register successfully and all others will fail. So the Kafka Broker that successfully registers temporary nodes on Zookeeper becomes the Kafka Broker Controller, and the other Kafka brokers are called Kafka Broker follower. Such as: When a broker fails, the Kafka Broker Controller reads the status of all partitions on the ZooKeeper. One replica in the ISR list is selected as the partition leader. (If all replicas in the ISR list are faulty, one surviving replica is selected as the leader. If all replicas of the partition are down, set the new leader to -1 and wait for recovery. Wait for any replica in the ISR to “live” and choose it as the leader. Or choose the first Replica (not necessarily the Leader in the ISR) that is “alive”. The Kafka Controller also notifies ZooKeeper of the breakdown of this broker, and ZooKeeper notifies other Kafka Brokers.

When TalkingData uses Kafka0.8.1, the kafka Controller is registered with Zookeeper and communicates with Zookeeper for 6 seconds. If the Kafka controller does not communicate with Zookeeper during 6s, Zookeeper considers the kafka controller dead and removes the temporary node from Zookeeper. The other Kafkas will assume that the Controller is gone and will try to register the temporary node again. The successful Kafka broker becomes the Controller. The previous Kafka Controller needed to shut down all sorts of nodes and events. A bug in TalkingData is that kafka Controller and Zookeeper did not communicate for 6s due to network reasons, so a new Kafka controller was elected. However, the original controller shut down was always unsuccessful. In this case, the message sent by producer could not land because there were two Kafka controllers in the Kafka cluster. Data is silted.

There was also a bug where TalkingData used Kafka0.8.1. When ack=0, the producer sent a message. As long as the corresponding Kafka Broker Topic partition leader receives the message, producer returns success, regardless of whether the Partition leader actually succeeds in saving the message to Kafka. When ack=1, the producer sends a message and synchronously stores the message to the leader of the partition in the corresponding topic. Then the producer returns success. The partition leader asynchronously synchronizes the message to another partition replica. When ACK =all or -1, it indicates that the producer sends a message and synchronously stores the message to the leader and replica of the corresponding topic partition. But if a Kafka Controller switches, This causes the partition leader to switch (the partition leader on the old Kafka controller is elected to another Kafka broker), but this results in data loss.

– Consumergroup: Each consumer thread can form a consumer group. Each message in a partition can be consumed by only one consumer in the consumer group. If a message can be consumed by multiple consumers (consumer threads), the consumers must be in different groups. Kafka does not allow messages in a partition to be processed by two or more consumer threads belonging to the same consumer group, unless a new consumer group is started. Therefore, if you want to consume a topic at the same time, you can start multiple consumer groups. However, it should be noted that the consumption of multiple consumers must be sequentially read the message in the partition. By default, the newly started consumer blocks read messages from the most recent place at the head of the partition queue. It does not allow multiple Bets to process messages concurrently as a consumer for update pessimistic lock, as AMQ does. This is because when multiple BETs consume data in a Queue, they must ensure that no more than one thread can hold the same message. Therefore, row-level pessimism is required, which results in consume performance degradation and insufficient throughput. Kafka allows only one consumer thread in the same consumer group to access a partition to ensure throughput. If the efficiency is not high, we can add the number of partitions to horizontally expand, and then add new consumer threads to consume. If different businesses need the data of this topic, it is better to have multiple consumer groups. All of them read message in sequence, and the offsite values do not affect each other. In this way, there is no lock competition, giving full play to the horizontal scalability and high throughput. This has led to the concept of distributed consumption.

When a consumer group is started to consume a topic, no matter how many partitions there are in the topic or how many consumer threads are configured in the consumer group, All consumer threads in the consumer group must consume all partitions. Even if there is only one consumer thread in the consumer group, the consumer thread will consume all partitions. Therefore, the optimal design is that the number of consumer threads in the consumer group is equal to the number of partitions, thus achieving the highest efficiency.

A message of the same partition can be consumed by only one Consumer in the same Consumer Group. Multiple consumers in a consumer group cannot simultaneously consume a partition.

Under a consumer group, no matter how many consumers there are, the consumer group must consume all partitions under this topic. When the number of consumers in a consumer group is smaller than the number of partitions in this topic, a conusmer thread consumes multiple partitions in groupA and groupB. In short, all partitions under this topic will be consumed. If the number of consumers in a consumer group is equal to the number of partitions in this topic, as shown in the groupC below, the efficiency is the highest, and each partition has one consumer thread to consume. If the number of consumers in the consumer group is greater than the number of partitions in the topic, there will be one free consumer thread in the group shown in the following figure. Therefore, when you set a consumer group, you just need to specify how many consumers there are in it. You don’t need to specify the corresponding consumer partition number. Consumer rebalance automatically.

Consumers in multiple Consumer groups can consume the same message, but such consumption is also sequential reading of messages in the way of O (1), so they must repeatedly consume this batch of messages. Can’t use multiple BET as consumer like AMQ (lock message, can’t consume message repeatedly)

– Consumer Rebalance:

Adding or deleting a Consumer triggers the Consumer Group Rebalance

2) The increase or decrease of brokers triggers Consumer Rebalance

– Consumer: The Consumer reads messages in order of O (1). Therefore, the offsite information of the last read must be maintained. The high level API,offset is stored in Zookeeper. The low level API offset is maintained by Zookeeper. In general, they use high level apis. The default delivery gurarantee is true. The default delivery gurarantee is true. The default delivery gurarantee is true. If offsite is already +1, the message will be lost; It can also be configured to finish the message processing and then commit, in which case the consumer will be slow to respond until the processing is complete.

In general, a consumer group must process a message for a topic. The Best Practice is that the number of consumers in the consumer group is equal to the number of partitions in the topic, so that the efficiency is the highest. One consumer thread processes one partition. If the number of consumers in the consumer group is less than the number of partitions in the topic, then there will be consumer threads that process multiple partitions at the same time. But in short, all partitions in this topic will be removed. If the number of consumers in a consumer group is greater than the number of partitions in a topic, the remaining consumer threads will do nothing, and the remaining consumer threads will handle each partition. This is a waste of resources because a partition cannot be processed by two consumer threads.

The number of Kafka consumers in each service is smaller than the number of partitions in the corresponding topic, but the number of consumers in all services is only equal to the number of partitions. This is because all the consumers of a distributed service come from a consumer group, (Consumers belonging to the same consumer group cannot process the same partition. Different consumer groups can process the same topic.) So if you’re processing messages sequentially, you’re going to be processing repeated messages. In this case, two different business logics will start two consumer groups to handle a topic.

If the flow of producer increases, the number of parition equals the number of consumers in the current topic. In this case, the solution is to expand: increase partitions under topic and increase consumers under this consumer group.

 

– Delivery Mode: The producer does not need to maintain the offsite information of the message, because the offsite is an increment ID and the producer sends the message. Kafka is different from AMQ, which is mainly used to process business logic. Kafka is mostly a log. Therefore, Kafka producers usually send a large number of messages in batches to a topic at a time. Load balance to a partition. Insert the offsite id into the partition. However, the Consumer side needs to maintain the offsite information of which message is currently consumed by this partition. The high level API is maintained on Zookeeper, while the low level API is maintained by its own program. The partition offsite of the low level API is maintained by Kafka. The partition offsite of the low level API is maintained by Kafka. When using the high level API, the message is processed first and then automatically commit offsite+1 (alternatively, manually), and Kakfa does not lock messages. Therefore, if processing the message fails, the message will be re-consumed when the Consumer Thread restarts and has not yet committed offsite+1. However, as a real-time processing system with high throughput and high concurrency, it can be tolerated that the system will be processed at least once. If you can’t tolerate it, you need to use the low level API to maintain the offsite information yourself. Then commit offsite+1 when you want.

-topic & Partition: Topic is equivalent to a queue in MQ, a traditional messaging system. Messages sent by the producer end must be specified to which Topic, but not to which partition under the Topic. This is because Kafka loads the messages it receives and distributes them evenly across different partitions in the topic (hash(message) % [number of brokers]). Physically, this topic is divided into one or more partitions, and each partiton is equivalent to a sub-queue. In terms of physical structure, each partition corresponds to a physical directory (folder). The folder name is [topicName]_[partition]_[serial number]. A topic can have numerous partitions based on service requirements and data volume. The number of partitions for a topic can be configured by changing num. Partitions in the Kafka configuration file at any time, specifying the number of parittions when creating a topic. The partiton number can also be modified through the tools provided by Kafka after Topic creation.

In general:

1) If the number of partitions in a Topic is greater than or equal to the number of brokers, throughput can be improved.

2) Try to distribute replicas of the same Partition to different machines for high availability. When a new partition is added, the message in the partition will not be reassigned. The message data in the original partition will not be changed. The new partition will be empty at first. Messages that enter the topic then re-participate in the load balance for all partitions

– Partition Replica: Each Partition stores copies on other Kafka Broker nodes so that the failure of a Kafka broker node does not affect the Kafka cluster. Replica copies are stored in the order of the Kafka broker. For example, if there are 5 Kafka Broker nodes and a topic has 3 partitions with 2 copies each, broker1 is broker1, Broker2 is Broker2, Broker3 is Broker2… (The number of replicas must not be greater than the number of Kafka Broker nodes; otherwise, an error is reported. The number of replicas here is actually the total number of copies of the partition, including one leader and other copies. This way, if a broker goes down, the data in Kafka is still intact. However, the higher the number of replicas, the more stable the system is, but the resources brought back and the performance decreased. Fewer replica copies may cause data loss in the system.

(1) How to send messages: The producer first sends messages to the partition leader, who then sends messages to the followers of other partitions. (It would be too slow if the producer sent to each replica)

(2) Before sending ACK to Producer, how many replicas need to ensure that the message has been received depends on the number of ACK allocations

(3) How to deal with a Replica not working: If the faulty partition replica is not in the ACK list, the producer is sending a message to the partition leader, and the partition leader does not respond to the message sent to the partition follower. It won’t affect the whole system, and it won’t be a problem. If the inactive partition replica is in the ACK list, the producer will wait for the inactive partition replca to write the message successfully when sending the message, but the message will wait until time out. The replica in an ACK list does not respond. Then Kafka automatically removes the partition replica working in this part from the ACK list. In the future, there will be no non-working partition replica in the ACK list when producer sends messages.

(4) How to deal with Failed Replica recovery: If the partition replica is not in the ACK list before, it can be managed by Zookeeper again after being started. Then, when the producer sends the message, The partition leader continues to send messages to the partition follower. If the partition replica is in the ACK list before, manually add the partition replica to the ACK list after the replica is restarted. (The ACK list is manually added. When a faulty partition replica appears, it is automatically removed from the ACK list.)

– Partition leader and follower: A Partition can also be divided into leader and follower. The leader is the primary partition. When a producer writes kafka, he writes to the partition leader first, who then pushes the leader to the followers of other partitions. Information about the partition leader and followers is controlled by Zookeeper. If the broker node where the partition leader resides breaks down, Zookeeper selects follower from the partition follower of other brokers to become parition Leader.

-topic allocation algorithm for partitions and Partition replicas:

1) Sort the Broker (size=n) with the Partition to be allocated.

2) Allocate the ith Partition to the (I %n) Broker.

3) Allocate the JTH Replica of the ith Partition to the (I + j) % n) Broker

Message delivery reliability

Kafka provides three modes for determining how a message is successfully delivered:

– The first is to treat sending a message as a success, which of course does not guarantee that the message will be successfully delivered to the broker.

– The second is the master-slave model. The delivery is successful only when the Master and all slaves receive the message. This model provides the highest delivery reliability, but damages the performance.

– The third model, that is, as long as the Master acknowledges the receipt of the message, the delivery is successful; In practice, the third model will be selected in most cases based on application characteristics and reliability and performance

The reliability of messages on the broker, because messages persist to disk, so that data on a broker is not lost if it is stopped properly; If you do not stop pages properly, messages that cannot be written to disk may be lost. This can be addressed by setting flush page cache cycles and thresholds. However, frequent disk writes may affect performance.

Kafka provides a “At least once” model for the reliability of message consumption, because the message reading progress is provided by offset, which can be maintained by the consumer or in ZooKeeper. However, when the consumer fails after the message consumption, the offset is not written back immediately. This can be alleviated by adjusting the commit offset cycle, the threshold, or even the consumer’s own transaction resolution between consumption and Commit offset, but if your application doesn’t care about double consumption, don’t resolve it at all for maximum performance.

– Partition ACK: If ACK =1, the broker returns a success message after producer successfully writes to the Partition leader, regardless of whether other Partition followers successfully write to the leader. When ACK =2, the broker returns success if the producer successfully writes to the partition leader and one of the other followers, regardless of whether the other followers wrote successfully. When ack=-1[number of parition], kafka Broker returns a success message only when producer writes successfully. Note that if ack=1, data will be lost if one of the brokers breaks down and the follower and leader of the partition switch.

 

– Message status: In Kafka, the state of the message is stored in the consumer. The broker does not care which message is consumed by whom, but only records an offset value (pointing to the partition where the message will be consumed next). This means that if the consumer fails to process the message, A message on a broker may be consumed more than once.

– Message persistence: Kafka persists messages to the local file system with o(1) efficiency. We all know that IO reads are resource-intensive and have the slowest performance, which is why SSD drives need to be replaced for database bottlenecks on IO. But Kafka, as a high-throughput MQ, can persist messages to files very efficiently. This is because Kafka is sequential o (1) time complexity, very fast. It is also the reason for the high throughput. Since the write persistence of messages is sequential, messages are also consumed sequentially, ensuring that partition messages are consumed sequentially. General machine, single machine 100K data per second.

– Message validity: Kafka keeps messages for a long time so that consumers can consume them multiple times, although many of the details are configurable.

– Produer: Producer sends messages to topics without specifying partitions. Kafka uses partition acks to control whether a message is sent successfully and sends the message back to the producer. The producer can have as many threads as he or she wants. The delivery guarantee on the Producer end is At least once by default. The Producer can also be set to send asynchronously At most once. Producer can use primary key idempotency to achieve Exactly once

– Kafka High throughput: The high throughput of Kafka is reflected in read and write. Distributed concurrent read and write is very fast. The write performance is reflected in sequential write with o(1) time complexity. Read performance is reflected in sequential read with o(1) time complexity and partition of topics. Consume threads in consume group can perform sequential read with high performance.

– Kafka delivery guarantee(message delivery guarantee) :

The message will be lost At most once.

Messages are never lost, but may be transmitted repeatedly.

Every message is sure to be transmitted once and only once, which is Exactly what the user wants.

– Batch sending: Kafka supports batch sending by message set to improve push efficiency.

– push-and-pull : In Kafka, producers and consumers push and pull messages to the broker, while consumers pull messages from the broker. Both producers produce and consume messages asynchronously.

– Relationships between brokers in a Kafka cluster: The relationship between brokers is not primary and secondary. Each broker has the same status in the cluster. We can add or remove any broker node at will.

– Load balancing: Kafka provides a metadata API to manage load between brokers (for Kafka0.8.x, zooKeeper is the main load balancing tool for Kafka0.8.x).

– Synchronous asynchronism: Producer adopts asynchronous push mode, which greatly improves the throughput of Kafka systems. (Synchronization or asynchronous mode can be controlled by parameters.)

– Partition mechanism: Kafka’s broker supports message partitions. The Producer can decide which partition to send messages to. The order in which the Producer sends messages is the order in which the message is sent. A topic can have multiple partitions, and the number of partitions is configurable. The partition concept allows Kafka to scale horizontally as MQ with huge throughput. Replica copies exist on different Kafka broker nodes. The first partition is the leader and the others are followers. Messages are written to the leader first. Then the partition leader pushes it to the Parition follower. So Kafka can scale horizontally, that is, partition.

– Offline data loading: Kafka is also ideal for loading data into Hadoop or a data warehouse due to its support for scalable data persistence.

– Live and offline data: Kafka supports both offline and real-time data. Because Kafka messages are persisted to files and expiration dates can be set, Kafka can be used as a highly efficient store for offline data for later analysis. Of course, as a distributed real-time messaging system, most of the time it is still used for real-time data processing, but when the cosumer consumption power is down, the message persistence can be used to stack data in Kafka.

– Plugins support: Active communities have developed plugins to extend Kafka’s capabilities, such as Storm, Hadoop, and Flume plugins.

– Decouple: Acts as an MQ to decouple operations performed asynchronously between producers and consumers

– Redundancy: Multiple replicas exist to ensure that a breakdown of a broker node does not affect the entire service

– Extensibility: Broker nodes can expand horizontally, partitions can increase horizontally, and partition replicas can increase horizontally

– Peak: Kafka scales horizontally as traffic spikes and the application continues to function

– Recoverability: When a component in the system fails, the replica of the partition does not affect the whole system.

– Sequential guarantee: Kafka producers write messages and consumers read messages sequentially, ensuring high performance.

– buffer: Since the producer side might be very simple, and the consumer side might be complicated and have database operations, the producer would be faster than the consumer. Without Kafka, the producer would call the consumer directly. Then it will cause the whole system processing speed is slow, add a layer of Kafka as MQ, can play a buffer role.

– Asynchronous communication: As MQ, Producer communicates with Consumer asynchronously

2.Kafka file storage mechanism

2.1 Some of the terms Kafka are explained as follows:

The publishing and subscription object in Kafka is topic. We can create a topic for each type of data, calling the clients that publish messages to the topic producers, and the clients that subscribe messages from the topic consumers. Producers and consumers can read and write data from multiple topics simultaneously. A Kafka cluster consists of one or more broker servers that persist and back up specific Kafka messages.

  • Broker: A Kafka node is a Kafka node. Multiple brokers can form a Kafka cluster.
  • Topic: A category of messages, such as page View logs, click logs, etc., can exist in the form of a Topic. The Kafka cluster can distribute multiple topics at the same time.
  • Partition: A physical grouping of topics. A topic can be divided into multiple partitions, each of which is an ordered queue
  • Segment: Partition physically consists of multiple segments, each containing message information
  • Producer: Messages are sent to topics
  • Consumer: Subscribes to a topic that consumes messages. A Consumer acts as a thread to consume
  • Consumer Group: A Consumer Group contains multiple consumers, which are pre-configured in the configuration file. Each consumer thread can form a consumer group. Each message in a partition can be consumed by only one consumer in the consumer group. If a message can be consumed by multiple consumers (consumer threads), the consumers must be in different groups. Kafka does not allow messages in a partition to be processed by two or more consumer threads, even from different consumer groups. It can’t handle messages with multiple BETS as consumers, as AMQ does. This is because when multiple BETS consume data in a Queue, they want to ensure that no more than one thread can hold the same message. Therefore, row-level pessimism is required, which results in consume performance degradation and insufficient throughput. Kafka allows only one consumer thread to access a partition to ensure throughput. If the efficiency is not high, we can add the number of partitions to horizontally expand, and then add new consumer threads to consume. In this way, there is no lock competition, giving full play to the horizontal scalability and high throughput. This has led to the concept of distributed consumption.

2.2 Some principles and concepts of Kafka

1. Persistence

Kafka uses files to store messages (append only log), which directly determines that Kafka relies heavily on the nature of the file system for performance. And optimizing the file system itself is very difficult on any OS. File caching/direct memory mapping are common tools. Because Kafka appends log files, the overhead of disk retrieval is small; To reduce the number of disk writes, the broker temporarily buffers messages and flush them to disk when the number (or size) of messages reaches a threshold. This reduces the number of DISK I/O calls. For Kafka, higher performance disks provide a more immediate performance boost.

 

2. The performance

In addition to disk IO, we need to consider network IO, which is directly related to throughput in Kafka. Kafka does not offer many great tricks; For the producer end, messages can be buffered and sent to the broker in batches when the number of messages reaches a certain threshold. The same is true for the consumer side, where multiple messages are fetched in batches. However, the size of the message volume can be specified through the configuration file. For kafka On the broker side, there seems to be a sendFile system call that could potentially improve network IO performance by mapping file data to system memory, and the socket can read the corresponding memory area without the process having to copy and swap again (this refers to disk IO data/kernel memory/process memory/network) Buffer “, between multiple data copy).

In fact, the CPU expenditure of producer/ Consumer /broker should be small, so enabling message compression is a good strategy. Compression consumes a small amount of CPU resources, but in the case of Kafka, network IO should be considered. Any message transmitted over the network can be compressed. Kafka supports various compression modes such as GZIP and SNappy.

 

3. Load balancing

Any broker in a Kafka cluster can provide producer with metadata containing a “list of servers alive in the cluster “/”partitions Leader list (see Node information in ZooKeeper). After receiving the metadata information, the producer maintains socket connections with all the partition leaders under the Topic. Messages are sent directly from the producer through the socket to the broker without going through any routing layer.

Asynchronous sending, where multiple messages are temporarily buffered on the client side and sent in batches to the broker. Too many I/OS for small data will slow down the overall network latency. Batch delayed transmission actually improves network efficiency. There are pitfalls, such as losing unsent messages when the producer fails.

 

4. The model of the Topic

In other JMS implementations, the location of message consumption is retained by Provider to avoid such things as duplicate messages or redispatches of messages that fail to be consumed, while also controlling the state of the message. This requires too much extra work for the JMS broker. In Kafka, a partition has only one consumer consuming a message. There is no control over message state and no complex message acknowledgement mechanism. When a message is received by a consumer, the consumer can store the offset of the last message locally and intermittently register the offset with ZooKeeper. Thus, the Consumer client is also lightweight.

In Kafka, the consumer is responsible for maintaining the consumption record of messages, while the broker is not. This design not only improves the flexibility of the consumer side, but also modestly reduces the complexity of the broker side design. This is the difference with many JMS Providers. In Addition, the ACK design of Kafka messages is very different from JMS. Kafka messages are sent to the consumer in batches (usually in terms of the number of messages or the size of chunks). When the message is consumed successfully, the offset of the message is submitted to ZooKeeper rather than the AC to the broker K. Perhaps you are aware that with this “loose” design, there is a danger of “losing” messages /” resending “messages.

 

5. Consistent message transmission

Kafka provides three message transfer consistency semantics: at most once, at least once, and exactly once.

At least one time: Data may be retransmitted, and data may be processed repeatedly.

Maximum 1: Data loss may occur.

Exactly 1: It doesn’t really mean only 1, but there is a mechanism. Ensure that there is no “data reprocessing” and “data loss”.

 

At most once: the consumer fetch the message, then save the offset, then process the message; When the client saved the offset, the Consumer process failed (crash) during the message processing. As a result, some messages could not be processed. Other consumers may then take over, but since the offset has been saved ahead of time, the new consumer will not be able to fetch messages before the offset (even though they have not been processed yet). This is “at most once”.

At least once: The consumer fetches the message, processes the message, and then saves the offset. If after the message is processed successfully, the operation of saving the offset fails due to zooKeeper exception or consumer failure in the stage of saving the offset, the next fetch may obtain the message processed last time, which is “at least once”.

The “Kafka Cluster” to consumer scenario can take the following scheme to achieve “exactly once” consistency semantics:

Minimum 1 + additional maximum number of processed messages in the output of the consumer: Because of the maximum number of processed messages, there will be no repeated processing of messages.

 

A copy of the 6.

In Kafka,replication is based on partitions, not topics; Kafka copies data from each partition to multiple servers. Each partition has a leader and multiple followers. The number of backups can be set through the Broker configuration file. The leader processes all read-write requests, and the followers need to keep in sync with the leader. A Follower acts like a “consumer”, consuming messages and saving them in a local log; The leader is responsible for keeping track of all followers, and if a follower” falls too far behind “or fails, the leader removes it from the Replicas synchronization list. A message is considered “committed” only when all followers have saved it successfully, and then consumers can consume it. This synchronization strategy requires a good network environment between the followers and the leader. Even if only one Replicas instance is alive, messages can still be sent and received as long as the ZooKeeper cluster is alive.

When selecting followers, consider the number of partition leaders on the new leader server. If a server has too many partition leaders, the server will be under more I/O pressure. When electing a new leader, “load balancing” needs to be taken into account. Brokers with fewer partition leaders are more likely to become the new leader.

 

7.log

The format of each log entry is “4-byte number N indicates the length of the message” + “N bytes of message content “. Each log has an offset to uniquely mark a message. The offset is an 8-byte number that indicates the starting position of the message in the partition. Each partition on the physical storage layer consists of multiple log files (called segments). The segment file is named “minimum offset”.kafka. For example, “00000000000. Kafka”; Minimum offset indicates the offset of the start message in this segment.

To obtain a message, specify the offset and the maximum chunk size. The offset indicates the start position of the message, and the chunk size indicates the total length of the maximum message (indirectly, the number of messages). According to the segment offset, you can find the segment file where the message is located, and then take the difference of the segment’s minimum offset to get its relative position in the file, and directly read the output.

 

Distributed 8.

Kafka uses ZooKeeper to store meta information, and uses ZooKeeper Watch to detect meta changes and act accordingly (e.g. consumer failure, load balancing, etc.).

Broker Node Registry: When a Kafka Broker is started, its node information (temporary ZNode) is first registered with ZooKeeper. The ZNode is also deleted when the Broker is disconnected from ZooKeeper.

Broker Topic Registry: When a Broker is started, it registers its held Topic and partitions information with ZooKeeper and remains a temporary ZNode.

Consumer and Consumer Group: When each Consumer client is created, it registers its own information with ZooKeeper. This function is mainly for “load balancing “. Multiple consumers in a group can interleave all partitions of a topic. In short, ensure that all partitions in the topic are consumed by the group, and that partitions are relatively evenly distributed to each consumer for performance purposes.

Consumer ID Registry: Each Consumer has a unique ID(host: UUID, which can be specified in a configuration file or generated by the system) that is used to mark Consumer information.

Consumer offset Tracking: Tracks the largest offset of the partition currently consumed by each Consumer. The zNode is a persistent node, and offset is related to group_id to indicate that when a consumer in the group fails, other consumers can continue to consume.

Partition Owner Registry: Used to mark which consumer is consuming the Partition. Temporary znode. This object represents that “a partition” can only be consumed by another consumer in the group, and when a consumer in the group fails, load balancing is triggered (i.e., partitions are returned to take over the partiti that are” free” ons)

When consumer starts, the action triggered is:

A) Conduct the “Consumer ID Registry” first;

B) Register a watch under the “Consumer ID Registry” node to listen for “leave” and “join” of other consumers in the current group; Any change in the list of nodes under this ZNode path triggers load balancing for consumers under this group (for example, when one consumer fails, other consumers take over partitions).

C) Register a watch under the “Broker ID Registry “node to monitor the survival of the Broker; If the broker list changes, a consumer rebalance is triggered for all groups.

 

Conclusion:

1) The Producer end uses ZooKeeper to “discover “broker lists, establish socket connections with each partition leader under the Topic, and send messages.

2) The Broker side uses ZooKeeper to register Broker information and has monitored the partition leader memory activity.

3) The Consumer side uses ZooKeeper to register Consumer information, including the partition list for Consumer consumption, discover broker lists, establish socket connections with the Partition leader, and obtain messages.

 

9. The choice of Leader

At the heart of Kafka are log files, and the synchronization of log files across clusters is the most fundamental element of distributed data systems.

We don’t need followers if leaders never go down! Once the leader is down, a new leader needs to be selected from the followers. However, the followers themselves may delay too long or crash, so high-quality followers must be selected as the leader. It must be guaranteed that once a message is committed but the Leader is down, the newly elected leader must be able to provide the message. Most distributed systems use the majority voting rule to select a new leader. For the majority voting rule, the most suitable leader is dynamically selected according to the status of all replica nodes.Kafka does not use this method.

Kafka dynamically maintains a set of in-sync Replicas, or ISR, in which nodes are highly consistent with the leader. Any message must be read by each node in the set and added to the log. To notify the outside world that the message has been committed. Therefore, any node in this set can be selected as the leader at any time.ISR is maintained in ZooKeeper. If the ISR has F +1 nodes, messages are not lost and services are normally provided when f nodes go down. The membership of an ISR is dynamic, and if a node becomes obsolete, it can rejoin the ISR when it reaches the “synchronizing” state. This leader selection method is very fast and suitable for Kafka scenarios.

An evil thought: What if all the nodes go down? Kafka’s guarantee that data will not be lost is based on the fact that at least one node is alive. Once all nodes are down, this cannot be guaranteed.

In practice, you must react in a timely manner when all replicas are down. There are two options:

1. Wait for any ISR node to recover and act as the leader.

2. Select the first recovered node (not only the ISR node) as the leader.

This is a tradeoff between availability and continuity. If you wait for the nodes in the ISR to recover, once the nodes in the ISR fail to get up or the data is already there, the cluster will never recover. If you wait for an unexpected ISR node to recover, the node’s data will be treated as online data, which may be different from the real data, because some data may not be synchronized. Kafka currently chooses the second policy, and future releases will make the choice of this policy configurable and flexible depending on the scenario.

This dilemma applies not only to Kafka, but to almost all distributed data systems.

 

10. Copy management

This is just one topic per partition, but in reality a Single Kafka will manage thousands of topic partitions. Kafka tries to distribute all partitions evenly across all nodes of the cluster, rather than concentrating them on some nodes. In addition, the master-slave relationship is balanced so that each node is the leader of a certain proportion of partitions.

It is also important to optimize the leader selection process, which determines how long the window will be empty when the system fails. Kafka selects a node as a “controller”, which is responsible for selecting a new leader among all nodes in the swimming partition when a node is down. This allows Kafka to batch and efficiently manage the master-slave relationship of all nodes in the partition. If the controller goes down, one of the remaining nodes will be switched to the new controller.

 

11. Synchronize the Leader with the replica

For a partition, the “broker” that saves the positive partition is the “leader” of the partition, and the “broker” that saves the backup partition is the “follower” of the partition. The backup partition copies exactly the messages of the positive partition, including additional attribute values such as message numbers. To keep the content of the positive partition consistent with that of the backup partition, Kafka starts a consumer process on the “broker” that stores the backup partition, thus keeping the content of the positive partition consistent with that of the standby partition. Typically, a partition has one “positive partition” and zero to multiple “backup partitions”. You can configure the total number of “positive partitions + backup partitions”. Different topics can have different values for this configuration. Note that the producer, consumer only communicates with the “leader” that holds the positive partition.

 

Kafka allows topic partitions to have a configurable number of replicas. You can configure the number of replicas for each topic. Kafka automatically backs up data on each replica, so data is still available when a node goes down.

Kafka duplicates are not required, you can configure only one copy, which is equivalent to only one copy of data.

Replicas are created in topic partitions, each with a leader and zero or more followers. All read and write operations are handled by the leader. Generally, the number of partitions is much larger than the number of brokers, and the leaders of each partition are evenly distributed among brokers. All followers copy the leader’s log, and the messages and order in the log are the same as those in the leader. Followers pull messages from the Leader and store them in their own log file just like normal Consumer.

Many distributed messaging systems automatically handle failed requests, and they have a clear definition of whether a node is alive. Kafka determines whether a node is alive under two conditions:

1. The node must be able to maintain the connection to ZooKeeper. ZooKeeper checks the connection to each node through the heartbeat mechanism.

2. If the node is a follower, it must be able to synchronize the leader’s write operations in a timely manner without a long delay.

A node that meets the above criteria is technically “in sync”, not vaguely “alive” or “failed”. The Leader keeps track of all “synchronizing” nodes and removes one if it becomes down, stuck, or delayed for too long. How long the delay is considered “too long” is determined by replica.lag.max.messages, and what is considered stuck is determined by replica.lag.time.max.ms.

A message is “committed” only when all replicas are added to the log, and only committed messages are sent to the consumer so that there is no fear that the message will be lost if the leader goes down. The Producer can also choose whether to wait for notification that the message is committed, which is determined by the parameter acks.

Kafka guarantees that “committed” messages are not lost as long as there is a “synchronizing” node.

2.3 Kafka Topology

 

A typical Kafka cluster consists of producers (web front-end FETs, server logs, etc.), brokers (Kafka supports horizontal scaling, the more brokers there are, the higher the throughput of the cluster), consumerGroups, and producers. And a Zookeeper cluster. Kafka manages the Kafka cluster configuration using Zookeeper: Rebalance the Kafka broker leader and when the Consumer Group changes, because Zookeeper is the offsite information for the partition that the Consumer consumes in Kafka. Producer uses push mode to publish messages to the broker, and Consumer uses pull mode to subscribe to and consume messages from the broker.

 

The analysis process is divided into the following four steps:

  • Partition stores distribution in topic
  • File storage in Partiton (partition is a directory (folder) on Linux server)
  • Partiton segment file storage structure
  • How do I find messages in a partition by offset

Through the above 4 process detailed analysis, we can clearly recognize the mystery of Kafka file storage mechanism.

 

Partition storage Distribution in 2.3 Topic

Assume that the Kafka cluster has only one broker, XXX /message-folder as the root directory for storing data files. Properties file configuration in Kafka Broker (parameter log.dirs= XXX /message-folder), For example, create two topics named Report_push and launch_info with the number of partitions=4

The storage path and directory rules are as follows:

xxx/message-folder

|–report_push-0

|–report_push-1

|–report_push-2

|–report_push-3

|–launch_info-0

|–launch_info-1

|–launch_info-2

|–launch_info-3

In a Kafka file store, there are multiple partitions under the same topic, and each partition is a directory. The partiton is named as topic name + ordered number. The number of partitions for the first partiton starts from 0, and the maximum number of partitions is reduced by 1.

When messages are sent, they are sent to a topic, which is essentially a directory. Topics are composed of some partitions, and their organizational structure is shown in the following figure:

As we can see, a Partition is a Queue structure. The messages in each Partition are ordered. The produced messages are continuously appended to the Partition, and each message is given a unique offset value.

The Kafka cluster stores all messages, whether they are consumed or not; We can set the expiration time of messages so that only expired data is automatically cleared to free disk space. For example, if we set the message expiration time to 2 days, all messages within these 2 days will be saved to the cluster, and data will only be cleared after 2 days.

Kafka only maintains offset values in partitions, because the offsite identifies which messages are consumed by that Partition. Each time a Consumer consumes a message, the offset increases by one. The state of the message is completely controlled by the Consumer, who can track and reset the offset so that the Consumer can read the message anywhere.

Storing message logs as partitions has multiple considerations. First, it is easy to scale in a cluster. Each Partition can be adjusted to fit the machine on which it resides, and a topic can be composed of multiple partitions, so the whole cluster can accommodate any size of data. The second is that concurrency can be improved because it can be read and written on a Partition basis.

As you can see, data in Kafka is persistent and fault-tolerant. Kafka allows users to set the number of replicas per topic, which determines how many brokers to store written data. If your replica number is set to 3, a copy of data will be stored on 3 different machines, allowing 2 machines to fail. It is generally recommended that the number of replicas be at least 2, so that data consumption is not affected when the machine is added, deleted, or restarted. If you have higher requirements for data persistence, you can set the number of copies to 3 or more.

Topics in Kafka are stored as partitions, and each topic can set the number of partitions, which determines the number of messages that make up the topic. When producing data, the Producer publishes the messages to the partitions of the topic according to certain rules (which can be customized). All the replicas mentioned above are in the unit of partition, but only the replicas of one partition are elected as the leader for reading and writing.

Considerations about how to set the partition value. A partition can only be consumed by one consumer (a consumer can consume multiple partitions at the same time), so if the number of partitions set is less than the number of consumers, some consumers will not consume data. Therefore, the number of recommended partitions must be greater than the number of consumers running at the same time. On the other hand, it is recommended that the number of partitions be larger than the number of brokers in the cluster, so that the leader partitions can be evenly distributed among brokers, resulting in cluster load balancing. Cloudera has hundreds of partitions per topic. Note that Kafka needs to allocate some memory for each partition to cache message data, and the larger the number of partitions, the larger the heap space allocated to Kafka.

2.4 File storage mode in Partiton

  • Each partion is equivalent to a huge file divided equally among multiple equally-sized segment data files. However, the number of messages in each segment file may not be the same. This feature allows old segment files to be deleted quickly.
  • Each partiton only needs to support sequential reads and writes. The segment file life cycle is determined by server configuration parameters.

In this way, unnecessary files can be deleted quickly and disk utilization can be improved.

 

2.5Partiton Segment File storage structure

The producer sends messages to a topic, and messages are evenly distributed to multiple partitions (randomly or according to the callback function specified by the user). A kafka broker receives a message. It adds the message to the last segment of the partition. When the number of messages in a segment reaches the configured value or the release time exceeds the threshold, the message is flushed to disk. Only messages flushed to disk are consumed by consumers. If a segment reaches a certain size, the broker creates new segments instead of writing to them.

Each part corresponds to an index in memory, which records the offset of the first message in each segment.

  • Segment file: Consists of two main files: index file and data file. These two files correspond to each other in pairs. The suffix “.index” and “.log “denote the segment index file and data file respectively.
  • The segment file is named after the maximum number of messages offset from the previous global segment. Values are up to 64 bits long, 19 digits long, and no digits are padded with zeros.

Each segment stores many messages, and the message ID is determined by its logical location. That is, the message ID can be directly located to the message storage location, avoiding additional mapping from ID to location.

Create a topicXXX that contains 1 partition Set the size of each segment to 500MB and start the producer to write a large amount of data to Kafka Broker. Figure 2 illustrates these two rules:

 

2.6 How do I Search for a Message using Offset in a Partition

For example, to read the message whose offset=368776, perform the following two steps.

Segment file

Figure 2 above, for example, of which 00000000000000000000. The index said at the beginning of the file, start offset (offset) to 0. The second file 00000000000000368769. The index of the volume start offset of 368770 = 368769 + 1. Similarly, the third file 00000000000000737337. The index of initial offset of 737338 = 737337 + 1, so on other subsequent files, named after the start offset and sort these documents, as long as according to the offset binary search * * * * file list, You can quickly locate specific files.

When the offset = 368776 to 00000000000000368769. The index | log

Segment file: offset=368776 Positioning in order to 00000000000000368769. The index of metadata and physical location 00000000000000368769. Physical offset of the log, Then run 00000000000000368769. Log to offset=368776.

The segment index file adopts the sparse index storage mode, which reduces the size of the index file and can be directly operated in memory through MMAP. The sparse index sets a metadata pointer for each corresponding message of the data file. It saves more storage space than the dense index, but consumes more time to search.

Kafka records offset to zK. However, frequent zK writing by the ZK Client API is an inefficient operation. 0.8.2 Kafka introduces native offset storage, moves offset management away from ZK, and can scale horizontally. The principle is to make use of Kafka’s compacted topic, whose offset is directly submitted to the compacted topic with the combination of consumer group,topic and partion as the key. Kafka also maintains triples in memory to maintain the latest offset information. The consumer can fetch the latest offset information directly from memory. Of course, Kafka allows you to quickly checkpoint the latest offset information to disk.

3. The Partition principle of Replication

Kafka efficient file storage design features

  • Kafka divides a parition large file into several small files in a topic. By using these small files, it is easy to periodically clean or delete consumed files and reduce disk usage.
  • Indexing information allows you to quickly locate messages and determine the maximum size of a response.
  • Index metadata can be mapped to memory to avoid I/O operations on segment files.
  • Sparse storage of index files can greatly reduce the space occupied by index file metadata.