Partition Partition
Significance of Zoning
Improve load balancing capability
Kafak uses partitioning to improve load balancing in two ways
- Kafka creates topics so that partitions are evenly distributed across brokers (cluster nodes)
- When a kafka producer sends a message to a Kafka cluster, the data is evenly distributed among the partitions through a certain load balancing policy
In this way, the load balance of the cluster is guaranteed at two levels
Achieve high Scalability of the system
Different distributed systems call partitions differently. For example, Kafka is called partition, MongoDB and Elasticsearch is called Shard, HBase is called Region, and Cassandra is called vnode. On the surface, the principles of their implementation may differ, but the overall idea of low-level Partitioning has not changed
- The scalability of Kafka can also be demonstrated by partitions, because we can dynamically scale up the partitions of a Topic to improve overall system throughput.
Partitioning strategies
- The default policy is: If a partition is specified, the system sends the data to the partition directly
- If no partition is specified but a key is specified, the partition is selected based on the hash value of the key
- If neither partition nor key is specified, the polling policy is used
Customize a partition policy
-
Refer to the previous section for details
-
You need to explicitly configure the producer parameter partitioner. The class, the parameters of the set, when producers program, you can write a concrete class implements org. Apache. Kafka. Clients. Producer. The partitioner interface. This interface is also very simple, defining only two methods: partition() and close().
int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
Copy the code
- Topic, key, keyBytes, value, and valueBytes are message data, and cluster is cluster information (such as how many topics, brokers, and so on there are in the Current Kafka cluster). Kafka gives you so much information that you can use it to partition the message and figure out which partition it should be sent to
Cross-region equipment room case
- For a cross-region machine room, we want it to happen locally, that is, local Produce only needs to send data to the corresponding local partition of the Kafka cluster
- Kafka-topics supports creating topics that specify partitions on which brokers (partition in Beijing on broker)
- When consuming, you can also adopt a localization strategy, where a local application consumes only local partitions, using the consumer.assign() direct message to specify partitions
Polling strategy
- Also known as the Round- Robin policy, sequential allocation. For example, if there are three partitions under a topic, the first message is sent to partition 0, the second to partition 1, the third to partition 2, and so on. It starts again when the fourth message is produced, assigning it to partition 0,
- The polling policy is the default partitioning policy provided by the Kafka Java Producer API. If you do not specify the partitioner. Class parameter, your producer program will poll evenly across all partitions of the topic.
- Polling has excellent load balancing performance and always ensures that messages are evenly distributed across all partitions as much as possible, so it is by default the most reasonable partitioning strategy and one of the most commonly used partitioning strategies
Random strategy
- Also known as Randomness strategy. Randomness means that we randomly place messages on any partition
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());
Copy the code
By message key order preservation policy
- Kafka allows you to define a message Key, called a Key for short, for each message. The Key can be a string with a clear business meaning, such as customer code, department number, or business ID. It can also be used to represent message metadata
- Once a message is defined with a Key, you can ensure that all messages with the same Key go into the same partition. This strategy is called “by message Key” because messages are processed sequentially in each partition
List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();
Copy the code
- If the number of partitions is changed, data with the same key cannot be guaranteed to be placed in the same policy
Existing problems
- If I now defines three key business data, but these three key corresponding to the news of the production rate, according to the teacher the diagram above shows is that certain key will only be stored in a particular partition, that is not at the expense of the expanding yao, if one of the key production rate is very large, and the other two key is not so big, Partition data is not balanced
- That’s the problem. So it’s not very common to use keys as logical distinctions in production environments. If key rates vary widely, consider using a different topic
example
- In many cases, data order is needed — global order. However, when we think about it carefully, we find that there is no need for global order, only the order of messages meeting common features. Therefore, we can adopt the way of promoting common features to key to ensure the order.
- When synchronizing mysql data to Kafka, we want the data to be restored to the same data as the mysql data. In this case, we can use tableName+ primary key to ensure that the message is not out of order.
Backup Repcation
- To improve message reliability, a Kafka topic partition has N replicas, in which N(greater than or equal to 1) is the replica fator of the topic. Kafka implements automatic failover through a multi-copy mechanism that guarantees service availability in the event of a broker failure in a Kafka cluster.
- When replication occurs in Kafka, ensure that partition logs are written to other nodes in an orderly manner. Among N Replicas, one replica is the leader and the others are followers.The leader processes all read and write requests to the partition.
- The number of replicas is less than or equal to the number of brokers, that is, for each Partition, there is at most one Replica in each Broker. By default, replicas of all partitions are evenly distributed across all brokers.
How replicas work
- Producers always write messages to the leader replica, and consumers always read messages from the leader replica. That is, the Leader is responsible for reading and writing requests
- As for the follower replica, it does only one thing: it sends regular requests to the leader replica asking the leader to send it the latest production messages so that it can keep up with the leader. In other words, the flower is just a backup of the data, so that the leader can be suspended and not be served
ISR mechanism
background
- Each topic zone can have multiple replicas (Leader and Follower). The Follower synchronizes the Leader’s data to form replicas. In order to ensure that the data sent by the producer can be reliably sent to the designated topic, each partition of the topic needs to send acknowledgement acknowledgement to the producer after receiving the data sent by the producer. If the producer receives THE ACK, it will carry out the next round of sending, or resend the data.
- In Kafka, ack is sent only when all the followers have synchronized. The advantage of this is that n+1 copies are required to tolerate n node failures when the Leader is reelected. However, the disadvantage is also high latency, because all the followers have to complete synchronization.
- If one of the followers fails to complete data synchronization due to network delay or some other reason, the Leader will block and wait until the follower completes data synchronization, which greatly affects performance
- Therefore, a new mechanism is introduced In Kafka: ISR(In-Sync Replica). The number of copies has a certain effect on Kafka throughput, but greatly enhances availability
- So kafka’s data synchronization is not completely synchronous or asynchronous, but isR-based
ISR
A collection of followers that are synchronized with the Leader. The Leader does not need to wait for all the followers to complete synchronization. Once the followers in the ISR complete data synchronization, the Leader can send an ACK to the producer
If the follower delay time in the ISR sets exceeds the configured parameter (replica.lag.time.max.ms), the follower delay time in the ISR sets will be removed from the ISR. It is only necessary to ensure that the Leader data can be sent to the followers in the ISR sets. If the Leader fails, a Follower from the ISR collection is elected as the new Leader.
AR
All replicas are collectively called Assigned Replicas, or AR.
ISR is a subset of AR. Each Partition has an ISR, which is dynamically maintained by the leader. The followers have some delays in synchronizing data from the leader (including the delay time replica.lag.time.max.ms and the number of delays replica.lag.max.messages).
Any follower that exceeds the threshold is removed from the ISR and stored in the Outof-Sync Replicas (OSR) list. New followers are also stored in the OSR. AR = ISR + OSR.
features
- Follower replicas do not provide services, but simply asynchronously pull data from the leader replicas on a regular basis. Since it is asynchronous, there is a risk that it will not be possible to synchronize with the Leader in real time.
- The leader maintains a Replica list that is basically synchronized with the Replica list. The list is called in-syncreplica (ISR). Each Partition has an ISR, which is dynamically maintained by the leader
- The Leader replica is naturally in the ISR. That is, the ISR is not just a collection of follower replicas, it necessarily includes Leader replicas. Even in some cases, the ISR has only one copy, the Leader
- If a flower falls too far behind a leader or has not initiated a data replication request for a certain amount of time, the Leader removes it from the reISR
- The Leader commits only when all replicas in the ISR send ACK packets to the Leader
Judgment criteria (replica.lag.time.max.ms)
- This criterion is the replica.lag.time.max.ms parameter value at the Broker end.
- This parameter indicates the maximum interval at which the Follower replica can lag behind the Leader replica. The current default is 10 seconds.
- This means that Kafka considers a Follower copy to be in sync with the Leader as long as the Follower copy does not lag behind the Leader by more than 10 consecutive seconds. Even though at this point the Follower copy holds significantly fewer messages than the Leader copy.
- The Follower copy’s only job is to constantly pull messages from the Leader copy and write them to its own commit log. If the synchronization continues slower than the message writing speed of the Leader copy, after the replica.lag.time.max.ms time, the Follower copy will be considered out of sync with the Leader copy and therefore cannot be added to the ISR. At this point Kafka automatically shrinks the ISR collection, “kicking” the copy out of the ISR.
- If the replica slowly catches up with the Leader, it can be added back to the ISR. This also suggests that the ISR is a dynamically adjusted set rather than a statically invariant one
Before version 0.90, there is a parameter that controls whether the follower is in the ISR set according to the amount of data difference between the follower and the leader. If the value is greater than the configured threshold, the ISR set is kicked out. However, there is an obvious problem with this, which is that if traffic increases, it may cause copies of normal synchronization to be excluded from the ISR
Producer configuration
request.required.asks=0
- 0 is equivalent to asynchronous. The leader does not need to reply, and the producer immediately returns, so sending the message is successful. Then, the sending network times out or the broker crashes (1.
- 1 When the leader sends an ACK after receiving the message, the ack is resent and the probability of loss is small
- -1 After the messages are successfully synchronized from all followers, the leader sends an ACK. The possibility of message loss is low
The significance of repcation mechanism
Provides data redundancy
- The system continues to function even when parts of it fail, increasing overall availability and data persistence.
Provide high scalability
- Supports horizontal scaling to improve read performance and throughput by adding more machines.
- This actually depends not only on the copy mechanism, but also on the partitioning mechanism (sharding)
Improve data locality
- Reduces system latency by allowing data to be placed near the user’s geographical location.
- Kafka’s follower replicas have nothing to do with improving data locality, either by helping lead replicas to be “read-resistant” in the way MySQL does, or by putting some replicas close to the client to improve data locality.
Replica roles (Data consistency assurance)
Leader Replica
- The leader replica provides services externally, by which I mean interacting with client programs
Election of leader replicas
- When the leader replica hangs, or the Broker of the leader replica is down,
- Kafka relies on the monitoring capabilities provided by ZooKeeper to sense in real time and immediately initiate a new round of leadership elections, selecting a new leader from the follower replicas.
- The old Leader replica can only be added to the cluster as a follower replica after being restarted.
- Kafka refers to all surviving copies that are not in the ISR as asynchronous copies. In general, asynchronous replicas lag too far behind the Leader, so data loss can occur if these replicas are chosen as the new Leader. After all, the messages held in these replicas lag far behind those in the old Leader. In Kafka, the process for electing such replicas is called Unclean leader election. Broker side parameter unclean. Leader. Election. The enable control whether to allow unclean leader election.
- Enabling Unclean Leader election may result in data loss, but the upside is that it keeps the Leader copy of the partition in existence and prevents it from stopping external services, thus improving high availability. On the other hand, the upside of banning Unclean leader election is that it maintains data consistency and avoids message loss at the expense of high availability
- When the LEOs of the replicas in the ISR are inconsistent, if the leader fails at this time, the new leader is elected in accordance with the order in the ISR rather than the height of the LEO
Follower Replica
- The follower replica does not process client requests, and its only task is to synchronize with the leader replica by asynchronously pulling messages from the leader replica and writing them to its own commit log
In many other systems follower replicas can be serviced externally, for example MySQL's slave library handles readsCopy the code
Design concept
- Easy to implement “read-your-writes”. Read-your-writes: when you write a message to Kafka using the producer API, use the consumer API to Read the message.
When you tweet, for example, you expect to see it immediately after you post a tweet. This is a classic read-your-writes scenario. If the follower replica is allowed to provide services externally, because replica synchronization is asynchronous, it is possible that the follower replica has not pulled the latest message from the leader replica, and the client cannot see the latest message written.Copy the code
- It is convenient to implement Monotonic Reads, which for a consumer user will not see a message present or absent as it is consumed multiple times.
Highly available implementation
- Multiple Broker processes can run on the same machine, but it is more common to spread brokers across different machines so that if one machine in the cluster goes down, even if all Broker processes running on it die, Brokers on other machines can still provide services.
- Another means of achieving high availability is Replication. The idea of a backup is simple: copies of the same data to multiple machines. These copies are called replicas in Kafka.
- There is a configurable number of replicas that hold the same data but have different roles and functions. Kafka defines two types of replicas: Leader Replica and Follower Replica.
scalability
- The replica mechanism ensures that data is persisted or messages are not lost, but it does not solve the scalability problem
- The partitioning mechanism in Kafka refers to the partitioning of each topic into multiple partitions, each of which is an ordered set of message logs. Each message produced by the producer is sent to only one partition,
- Replicas are defined at the partition level. Each partition can be configured with multiple replicas, including only one leader replica and n-1 follower replica.
Why doesn’t Kafka design for read-write separation
- Kafka’s partitions already load balance reads from multiple brokers, not MySQL’s master and slave. The load is on the master
- Kafka is a message queue, so consumption needs to be shifted. The database is an entity data. There is no such concept.
- For producers, kafka can be configured to control whether or not to wait for a message to be acknowledged by a follower. If the message is read from above, it also needs to be acknowledged by all the followers before replying to the producer, causing performance to decline
- Redis and MySQL both support master-slave read-write separation, which I personally think is related to their usage scenarios. For load types with lots of reads and relatively few writes, using read/write separation is a good solution – we can add a lot of followers to improve read performance. Kafka, on the other hand, is primarily a message engine rather than a data store that provides read services. It usually involves frequent production and consumption of messages. This is not typical of read and write scenarios, so read/write separation schemes are not suitable for this scenario.