1. Introduction of Kafka

1. Introduction

Apache Kafka is a distributed stream processing platform. It has the following characteristics:

  • Support message publishing and subscription, similar to RabbtMQ, ActiveMQ and other message queues
  • Support real-time data processing
  • To ensure the reliability of message delivery
  • It supports persistent storage of messages and ensures fault tolerance of messages through multi-copy distributed storage scheme
  • With high throughput, a single Broker can easily handle thousands of partitions and millions of messages per second

2. Basic concepts

Messages And Batches

Kafka’s basic unit of data is called a message. To reduce network overhead and improve efficiency, multiple messages are written to the same Batch.

Topics And Partitions

Kafka messages are categorized by Topics, and a topic can be divided into Partitions, each of which is a commit log. Messages are appended to the partition and then read in first-in, first-out order. Kafka implements data redundancy and scalability through partitions, which can be distributed across different servers, meaning that a Topic can span multiple servers to provide greater performance than a single server.

Because a Topic contains multiple partitions, sequential messages cannot be guaranteed across the entire Topic, but sequential messages can be guaranteed within a single partition.

Producers And Consumers

  1. producers

The producer is responsible for creating the message. In general, producers distribute messages evenly across all partitions of a topic, regardless of which partition the message is written to. If we want to write messages to a specified partition, we can do so through a custom partition.

  1. consumers

Consumers are part of consumer groups, and consumers are responsible for consuming messages. Consumers can subscribe to one or more topics and read them in the order in which messages are generated. Consumers distinguish between read messages by examining their offsets. The offset is an increasing value that Kafka adds to when creating a message, and is unique for each message within a given partition. The consumer stores the last read offset of each partition on Zookeeper or Kafka, and if the consumer shuts down or restarts, it can retrieve the offset to ensure that the read state is not lost.

A partition can only be read by one consumer in the same consumer group, but can be read by multiple consumers composed of different consumer groups. When consumers in multiple consumer groups read the same topic together, they do not affect each other.

Brokers And Clusters

A separate Kafka server is called the Broker. The Broker receives messages from the producer, sets offsets for the message, and commits the message to disk for saving. The Broker serves consumers and responds to requests to read partitions by returning messages that have been committed to disk.

A Broker is part of a Cluster. Each cluster elects a Broker as a cluster Controller, which manages the work, including assigning partitions to brokers and monitoring brokers.

In a cluster, a Partition is subordinate to a Broker, which is called the Leader of the Partition. A partition can be assigned to multiple Brokers, at which point partition replication occurs. This replication mechanism provides message redundancy for partitions so that if one Broker fails, other brokers can take over leadership.

2. Kafka producers

1. Partition policies

Reasons for partitioning

  • It is easy to scale in a cluster. Each Partition can be adjusted to fit the machine on which it is located, and a topic can be composed of multiple partitions, so the cluster can accommodate any size of data.
  • Concurrency can be improved because you can read and write on a Partition basis.

Principles of zoning

We encapsulate the data sent by producer into a ProducerRecord object.

  1. If a partition is specified, the specified value is directly used as the partition value.
  2. If the partition value is not specified but there is a key, mod the hash value of the key and the number of partitions of the topic to obtain the partition value.
  3. In the case that there is no partition value or key value, an integer is randomly generated during the first call (which is incremented by subsequent calls), and the total number of available partitions of this value is mod to obtain the partition value. Also known as the Round Robin algorithm.

2. Ensure data reliability

In order to ensure that the data sent by producer can be reliably sent to the designated topic, every partition of a topic needs to send ack (Acknowledgement) to producer after receiving the data sent by producer. If the producer receives an ACK, it sends the next round, or resends the data otherwise.

Duplicate data synchronization policy

plan advantages disadvantages
Send an ACK when more than half have synchronized Low latency When a new leader is elected, the failure of N nodes is tolerated and 2N +1 copies are required
Ack is sent after all synchronization is complete When a new leader is elected, n+1 copies are required to tolerate the failure of N nodes High latency

Kafka chose the second option for the following reasons:

  1. In order to tolerate the failure of N nodes, the first solution requires 2n+1 copies, while the second solution requires only N +1 copies. Each partition in Kafka has a large amount of data, and the first solution creates a large amount of data redundancy.
  2. Although the second scheme has higher network latency, it has less impact on Kafka (transmission under the same network environment).

ISR

The Leader maintains a dynamic in-sync Replica set (ISR), which means a collection of followers that are in sync with the Leader. When the followers in the ISR complete the synchronization, the leader sends an ACK to the producer. If the follower does not synchronize data to the leader for a long time, the follower will be kicked out of the ISR. This time threshold is set by the replica.lag.time.max.ms parameter. When the Leader fails, a new Leader is elected from the ISR.

Ack response mechanism

For some unimportant data, the reliability of the data is not very high and can tolerate a small amount of data loss. Therefore, there is no need to wait for all the followers in the ISR to receive data successfully.

So Kafka provides users with three levels of reliability, and the user chooses the following configuration based on trade-offs between reliability and latency requirements.

Ack parameter Settings (asks)

  • 0: The producer does not wait for the broker’s ACK. This operation provides a minimum delay for the broker to return a received message before it has been written to disk. Data may be lost if the broker fails.
  • 1: The producer waits for the ACK from the broker, and the partition’s leader returns an ACK after falling down. If the follower leader fails before the follower synchronization succeeds, data is lost.
  • 1 (all) : The producer waits for the BROKER’s ACK and the partition’s leader and follower (in the ISR) to drop successfully before returning an ACK. However, if the follower synchronization is complete and the broker sends an ACK, If the leader fails, the producer resends messages to the new leader, causing data duplication.

Data consistency problems (Troubleshooting)

  • If a follower fails, the follower is temporarily kicked out of the ISR. After the follower recovers, the follower reads the last HW recorded on the local disk and intercepts the log file that is higher than the HW. Synchronization starts from HW to the leader. After the follower’s LEO is greater than or equal to the Partition’s HW, that is, after the follower catches up with the leader, the follower can join the ISR again.
  • After the leader fails, a new leader is elected from the ISR. Then, to ensure data consistency among multiple copies, the remaining followers cut off the log files whose values are higher than HW. The data is then synchronized from the new leader.

Note: This only guarantees data consistency between replicas, not data loss or duplication.

2. 3. How to play poker

Setting the ACK level of the Server to -1 ensures that no data will be lost between Producer and Server. By contrast, setting the server ACK level to 0 ensures that the producer will send each message only Once (i.e. At Most Once).

At Least Once ensures that data is not lost, but not repeated. At Least Once, data cannot be lost. However, for some very important information, such as transaction data, downstream data consumers require that data be neither duplicated nor lost, the Exactly Once semantics.

Prior to version 0.11, there was nothing Kafka could do about it except to ensure that data was not lost and that downstream consumers were globally de-duplicated. In the case of multiple downstream applications, each requires separate global de-weighting, which can have a significant impact on performance.

Kafka 0.11 introduced a major feature: idempotence. Idempotent means that no matter how many times Producer sends repeated data to the Server, the Server persists only one. Idempotence combined with At Least Once semantics forms Kafka’s Exactly Once semantics. That is:

  • At Least Once = Exactly Once To enable idempotency, simply set enable.idompotence to true in the Producer argument.

The idempotent implementation of Kafka essentially reloads the data upstream of what was previously required downstream. A Producer that enables idempotent is assigned a PID during initialization, and messages sent to the same Partition carry Sequence numbers. The Broker caches messages that have the same primary key and persists only one message.

However, PID will change after restart, and different partitions also have different primary keys, so idempotent cannot guarantee Exactly Once across partitions and sessions.

Idempotent send

As mentioned above, one way to achieve Exactly Once is to make the downstream system idempotent. In Kafka Stream, Kafka Producer is itself a “downstream” system. That would allow Kafka Stream to support Exactly once semantics to some extent.

To implement the idempotent semantics of Producer, Kafka introduces the Producer ID (PID) and Sequence Number. Each new Producer is assigned a unique PID when initialized, which is completely transparent to the user and not exposed to the user.

For each PID, each <Topic and Partition> of the data sent by the Producer corresponds to a monotonically increasing Sequence Number starting from 0.

Similarly, the Broker maintains an ordinal number for each <PID, Topic, Partition> and increments the ordinal number each time a message is committed. For each received message, the Broker accepts it if its number is larger than the number the Broker maintained (that is, the number of the message that was last committed), otherwise it is discarded:

  • If the number of a message is larger than the number maintained by the Broker, it indicates that some data has not been written, or is out of order. The Broker rejects the message
  • If the message number is less than or equal to the number maintained by the Broker, it is a duplicate message. The Broker drops the message

The above design addresses two issues in pre-0.11.0.0:

  • After the Broker saves the message, it breaks down before sending an ACK. The Producer thinks the message failed to be sent and tries again, causing data duplication
  • The previous message fails to be sent, the next message succeeds to be sent, and the previous message succeeds after a retry. As a result, data is out of order

Kafka consumers

1. Consumption pattern

The Consumer uses pull mode to read data from the broker.

The push pattern is difficult to adapt to consumers with different consumption rates, because message sending rates are determined by the broker. The goal is to deliver messages as quickly as possible, but it is easy for consumers to fail to process messages, typically through denial of service and network congestion. The Pull pattern consumes messages at an appropriate rate based on the consumer’s ability to consume.

The downside of the pull pattern is that if Kafka has no data, the consumer may get stuck in a loop that keeps returning empty data. For this reason, Kafka consumers pass in a timeout when consuming data. If no data is currently available for consumption, the consumer will wait a certain amount of time before returning.

2. Partition allocation policy

There are multiple consumers in a consumer group and multiple partitions in a topic, so partition allocation is inevitably involved, that is, determining which consumer consumes that partition.

Kafka has two allocation strategies: RoundRobin and range.

Roundrobin moulds the number of consumers based on partition numbers and allocates them in roundrobin.

3. Maintain offset

Since a consumer may have power outages and other faults in the process of consumption, after recovery, it needs to continue consumption from the position before the fault, so it needs to record the offset to which it has consumed in real time, so that it can continue consumption after recovery.

Group + topic + partition (GTP)

Before Kafka 0.9, consumers stored offsets in Zookeeper by default. Starting with 0.9, consumers stored offsets in a built-in Kafka topic by default. The topic is __consumer_offsets (where the consumer is the producer for offset).

4. In-depth understanding of Kafka replicas

1. Kafka cluster

Kafka uses Zookeeper to maintain information about the brokers. Each broker has a unique identity broker.ID that identifies it within the cluster, which can be configured in the configuration file server.properties or generated automatically by the program. The following is the automatic creation of the Kafka Brokers cluster:

  • When each broker starts, it creates a temporary node in Zookeeper’s /brokers/ IDS path and writes its broker. Id to register itself with the cluster.
  • When there are multiple brokers, all brokers compete to create /controller nodes on Zookeeper. Since nodes on Zookeeper are not duplicated, only one broker must be created successfully. This broker is called a controller broker. In addition to the functions of other brokers, it manages the state of topic partitions and their replicas.
  • A Watcher event registered with Zookeeper is triggered when the broker breaks down or exits, causing the Zookeeper session to time out. Kafka then performs fault tolerance. A new Controller election is also triggered if the controller Broker goes down.

2. Duplicate mechanism

To ensure high availability, Kafka partitions are multi-replica, and if one replica is lost, partition data can be retrieved from other replicas. However, this requires the data of the corresponding copy to be complete, which is the basis of Kafka data consistency, so you need to use controller Broker for specialized management. Kafka’s replica mechanism is explained in detail below.

Partitions and replicas

Kafka themes are divided into partitions, which are Kafka’s basic unit of storage. Each partition can have multiple copies. One replica is the Leader replica. All events are sent directly to the Leader replica. The other copies are Follower replicas, which need to be replicated to keep the data consistent with the leader replica. When the leader replica is unavailable, one of the Follower replicas becomes the new leader.

ISR mechanism

Each zone has an IN-sync Replica (ISR) list to maintain all synchronized and available replicas. A leader copy must be a synchronous copy, and a follower copy must satisfy the following conditions to be considered a synchronous copy:

  • There is an active session with Zookeeper, that is, the heartbeat must be sent to Zookeeper periodically.
  • Messages were retrieved from the master replica with low latency within the specified time.

If the replica does not meet the above criteria, it will be removed from the ISR list and will not be added again until it meets the criteria.

Incomplete election of chief

For copy mechanism, the level of broker has an optional configuration parameters unclean. Leader. Election. The enable, the default value is false, forbid not completely represent the leader of the election. This is for when the princes duplicates of hang up and there are no other available in the ISR, whether to allow an incomplete copy of synchronous be leader and this may lead to loss of data or data inconsistency, in some higher requirements for data consistency of the scene (such as the financial sector), it may not be able to tolerate, so its default value is false, Set to true if you can allow some data inconsistency.

Least synchronized copy

Another parameter related to the ISR mechanism is min.insync.replicas, which can be configured at the broker or topic level to represent at least several replicas available in the ISR list. Assuming a value of 2, the entire partition is considered unavailable when the number of available replicas is less than this value. At this point the client to write data to the partition will throw an exception when org.apache.kafka.com mon. Errors. NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required.

Send confirmation

Kafka has an optional ACK parameter on the producer, which specifies how many partitions copies must receive the message before the producer considers the message to have been written successfully:

  • Acks =0: the message is considered successful when sent and does not wait for any response from the server
  • Acks =1: The producer receives a success response from the server as soon as the cluster leader receives the message
  • Acks =all: The producer receives a successful response from the server only when all participating nodes have received the message

3. Data request

Metadata request mechanism

Of all replicas, only the lead replicas can read and write messages. Since the lead copy of different partitions may be on different brokers, if a broker receives a partition request but the lead copy of that partition is not on the broker, It then returns an error response to the client with Not a Leader for Partition. To solve this problem, Kafka provides a metadata request mechanism.

Each broker in the cluster caches partitioned copies of all topics, and clients periodically send and send metadata requests, which are then cached. The interval for periodically refreshing metadata can be specified by configuring metadata.max.age.ms for the client. With metadata information, the client knows the broker of the lead copy and then sends read and write requests directly to the corresponding broker.

If the election of the Partition copy occurs within the time interval of the scheduled request, it means that the original cached information may be outdated, and the wrong response may be received from Not a Leader for Partition. In this case, the client will request to issue metadata again, and then refresh the local cache. Then go to the correct broker to perform the corresponding operation as shown below:

Data visibility

Note that not all data stored on the partition leader can be read by the client. To ensure data consistency, only data stored by all synchronous replicas (all replicas in the ISR) can be read by the client.

The last

You can follow my wechat public number to learn and progress together.