Background: As in other distributed systems, the number of partitions per Topic in a Kafka cluster is not always the best. However, in general, the business side may simply request partition expansion based on the processing capacity of the producer or consumer. In this case, you need to determine the number of partitions based on specific scenarios.
For the business topics hosted by a Kafka cluster, the number of partitions can reflect the magnitude of the entire business and provide as high throughput as possible. However, more partitions do not mean higher throughput and processing power. Usually, the analysis is carried out by both the business side and the underlying service side.
The following are the advantages and disadvantages of multi-partitioning topics, and the number of partitions can be properly planned based on requirements and scenarios.
directory
- Multiple partitions can improve Higher Throughput
- Need more Open File Handles
- Multiple partitions may increase overall unavailability
- May increase end-to-end Latency
- More partitions may require more memory on the client side
- conclusion
Multiple partitions can improve Higher Throughput
First, we need to understand that topic partition is a parallel unit in a Kafka cluster.
On both the producer and broker sides, writes to different partitions can be done completely in parallel. Some expensive operations, such as compression, require more hardware resources.
At the consumer end, kafka will always give every consumer threads to a single partition data, so when the number of partition is also limits the maximum number of data the consumer threads, if consumers enough to be in the shortest possible time consumption data, by optimizing the client program or increase the number of partition way (and increase consumer threads) to ease, The latter, however, requires topic-level adjustments and verification that producers have partition-binding related operations.
Therefore, in general, more partitions in a Kafka cluster mean higher throughput.
The number of selected partitions can generally be calculated simply based on throughput.
You can measure all the performance that can be achieved by production (called P) and consumption (called C) on a single partition. If our target throughput is T, then we can set the number of partitions to Max (t/p, t/c).
On the producer side, you can set the batch size, compression will ack, the replication factor to calculate the parameters such as the throughput of a single partition. However, as shown in LinkedIn Benchmarking, typically a single partition can handle 10 Million data writes per second.
Throughput on the consumer side is usually application-dependent because it corresponds to the speed at which the consumer logic processes each message and therefore has to be actually measured.
Although you can expand partitions as needed later in the creation of a topic, be careful with messages with keys.
When a producer writes a message with a key, Kafka uses the hash of the key to determine the partition to which the data is mapped. This ensures that messages with the same key are routed to the same partition, ensuring the order of a class of messages.
If the number of partitions suddenly increases at this time, it may lead to data disorder. In order to avoid such problems in the future, over-partition is generally carried out at the initial stage of service access, that is, create as many as possible during topic creation. But basically need to forecast the future throughput before access to determine a reasonable number of partitions, so even if the current cluster on a smaller scale, the topic can be planning in advance, after late cluster scale can be migrated, otherwise just began because of the cluster scale is small, and the number of partitions is not reasonable planning, later will have trouble, And so when producers use messages with keys, you can keep up with throughput growth without breaking the semantics in your application.
In addition to throughput, there are other factors to consider when planning the number of partitions, because there may be additional impacts as throughput increases.
Need more Open File Handles
Each partition maps to one of the directories on the broker machine, with four files per segment in each log directory:
- 00000000000029487197. The index: index file
- 00000000000029487197. The log: real log file
- 00000000000029487197. The snapshot: snapshots
- 00000000000029487197. Timeindex: time index
In Kafka, the broker opens a file handle for each segment, but this is only a matter of configuration, typically in a production cluster you can set the number of open files at the OS level to 30 thousand
Multiple partitions may increase overall unavailability
Overall availability is reduced because more partitions need to recover data in the event of an exception, and data synchronization between multiple partitions may be consumed on the network.
The replica mechanism inside the Kafka cluster provides high availability and persistence. If a partition has multiple copies, each copy will be stored on a different broker, and the copies are designed as leader and follow copies.
Internally, Kafka automatically manages all of these replicas and ensures that they are kept in sync. Producer and consumer requests to partitions are responded to by the Leader replica. When the broker goes down, the Leader replica on that node becomes temporarily unavailable. The controller role inside Kafka automatically switches the leader copy of the partition to another copy that meets the criteria to provide read and write requests as quickly as possible (depending on data consistency and reliability requirements).
Note: The switching action of the leader copy will involve the controller to obtain the metadata of the relevant copy of the corresponding partition from ZooKeeper. Within the controller, the zooKeeper operation is serial.
In general, when a Broker is gracefully shut down, the controller actively removes the leader from the closed Broker, one at a time. Switching a leader usually takes only a few milliseconds, so there is only a small unavailable window for the client when a graceful restart is used.
However, when the broker is abnormally shut down (kill -9), the observed unavailability may be proportional to the number of partitions. Assuming that each broker has 2000 partitions and 2 copies of each partition, each broker will be responsible for basically 1000 partitions, and all 1000 partitions will be unavailable at the same time when the broker is abnormally shut down. Suppose it takes 5ms to elect a master for a single partition, and 5s for 1000 partitions.
Therefore, for some partitions, the observed unavailability may be 5 seconds plus the time required to detect a failure.
If not, the abnormally closed broker node happens to be a controller, in which case the partition selection of the leader will not take place immediately until the Controller itself is transferred to the normal broker node.
Controller The controller automatically fails over, but the new controller needs to read some metadata for each partition from ZooKeeper during initialization.
For example, if there are 10,000 partitions in a Kafka cluster and each partition takes 2 ms to initialize metadata from ZooKeeper, this might add 20 seconds to the unavailable window.
In general, abnormal closures are rare. However, if you are concerned with availability in these rare cases, it is best to limit the number of partitions per Broker to 2 to 4000 and the total number of partitions in a cluster to tens of thousands.
May increase end-to-end Latency
The end-to-end delay in Kafka is defined by the time between the producer publishing the message and the consumer reading the message.
Kafka exposes the commited log to the consumer only after the message has been committed, that is, when the message has been copied to all synchronized copies. Therefore, the time to commit a message can be an important part of the end-to-end delay.
By default, Kafka’s Broker node uses only one thread to copy data from another broker, sharing all partitions of the copy between the two brokers. (Large parameters can be set to adjust the number of synchronized threads)
Experiments have shown that copying 1000 partitions from one Broker to another can add about 20 ms of latency. This means that the end-to-end delay is at least 20 ms, which is actually a bit high for some real-time computing applications, although if the cluster is large, This problem is mitigated by partitioning across different brokers (the rest of the brokers can average fetch messages from this one broker)
Therefore, the added delay due to the delivery of the message will be only ms, not tens of ms.
If the delay indicator is related, the partition can be calculated as follows:
It might be a good idea to limit the number of partitions per Broker to 100 * b * R, where B is the number of brokers in a Kafka cluster and R is the replicator factor.
More partitions may require more memory on the client side
On the client side internally, producers buffer messages for each partition. When enough data has accumulated or a sufficient amount of time has elapsed, the accumulated messages are removed from the buffer and sent to the Broker.
If you increase the number of partitions, messages accumulate in more partitions in the client region. The total amount of memory used may now exceed the configured memory limit.
When this happens, the producer must block or delete any new messages, neither of which is ideal. To prevent this, you need to reconfigure the generator to have a larger memory size.
As a rule of thumb, for good throughput, you should allocate at least a few tens of kilobytes of memory for each partition generated in the producer, and adjust the total if the number of partitions increases significantly.
Consumers have a similar problem. Consumers get a batch of messages for each partition. The more partitions the consumer uses, the more memory is required. However, this is usually only a problem for non-real-time users.
conclusion
In general, the more partitions in a Kafka cluster, the higher throughput. However, you must be aware of the potential impact of total partitions or having too many partitions per Broker on availability and latency, which often requires proper planning and adjustment by both the business side and the underlying service side.
The related documents
- kafka-supports-200k-partitions-per-cluster
- kafka-topic-partition-numbers