In this article, a software engineer presents 20 best practices for Apache Kafka in large applications. Apache Kafka is a popular distributed data streaming platform that has been widely used by large companies such as New Relic (Data Intelligence), Uber, And Square (mobile payments company) to build scalable, high-throughput, and highly reliable real-time data streaming systems.

Apache Kafka is a popular distributed data streaming platform that has been widely used by large companies such as New Relic (Data Intelligence), Uber, And Square (mobile payments company) to build scalable, high-throughput, and highly reliable real-time data streaming systems. For example, in New Relic’s production environment, the Kafka cluster was able to process over 15 million messages per second with a data aggregation rate approaching 1 Tbps.

As you can see, Kafka greatly simplifies the processing of data flows, which is why it is popular with many application developers and data management experts. However, using Kafka in large systems can be complicated. If your consumers can’t keep up with the data flow, messages often disappear before they can be viewed. At the same time, its limitations in automated data retention, high-traffic publish-subscribe (pub/sub) patterns, and so on, may affect the performance of your system. It’s no exaggeration to say that if systems that house data streams can’t be scaled up on demand or are unreliable, you’ll often have trouble sleeping or sleeping.

To reduce that complexity, I’ve shared 20 best practices New Relic has for Kafka clusters dealing with high throughput. I will expand from the following four aspects:

  1. Partitions
  2. Consumers
  3. Producers (Producers)
  4. Brokers

Get a quick understanding of Kafka’s concepts and architecture

Kafka is an efficient distributed messaging system. In terms of performance, it has built-in data redundancy and elasticity, as well as high throughput and scalability. Among other things, it supports automated data retention restrictions, provides data transformation for applications in a “stream” manner, and “compresses” data flows according to “key-value” modeling relationships.

To understand the various best practices, you need to familiarize yourself with the following key terms:

  • Message: A record or unit of data in Kafka. Each message has a key and a corresponding value, and sometimes an optional header.
  • Producer: Producer posts messages to Kafka’s topics. Producer decides how to partition topics, such as random polling or partitioning algorithms based on message keys.
  • Broker: Kafka operates as a distributed system or cluster. Each node in the cluster is then called a broker.
  • Topic: Topic is a category of data records or messages that are published. Consumers subscribe to topics to read the data written to them.
  • Topic partition: Different topics are divided into different partitions, and each message is assigned an offset, and each partition is usually copied at least once or twice. Each partition has a leader and one or more copies (i.e., copies of data) stored on each follower to prevent a broker from failing. All brokers in a cluster can act as leaders and followers, but a broker can have at most one copy of a topic partition. The Leader can be used for all read and write operations.
  • Offset: Each message in a single partition is assigned an Offset, which is a monotonically increasing integer that can be used as a unique identifier for messages in the partition.
  • Consumer: Consumers subscribe to Topic partitions to read Kafka’s various topic messages. The consumer application process then receives a message to complete the specified work.
  • Consumer groups: Consumers can be logically divided into Consumer groups. Topic partitions are evenly distributed to all consumers in the group. Therefore, all consumers in the same consumer group operate in a load-balancing manner. In other words, every consumer in the same group sees every message. If a consumer is offline, the partition will be assigned to another consumer in the group. This is called “rebalancing.” Of course, if there are more consumers in the group than there are partitions, some consumers will be left idle. Conversely, if there are fewer consumers in the group than partitions, some consumers will get messages from more than one partition.
  • Lag: When a consumer can’t keep up with the rate at which messages are being generated, the consumer lags because it can’t read messages from the partition. The delay is represented as the number of offsets following the partition head. The time it takes to return to normal from a delayed state (to “catch up”) depends on the speed of messages a consumer can handle per second. The formula is as follows:

time = messages / (consume rate per second – produce rate per second)

Best practices for Partitions

  • Understand the data rates of partitions to ensure proper storage space for data is available. The “data rate of a partition” is the rate at which exponential data is generated. In other words, it is calculated by multiplying the average message size by the number of messages per second. The data rate determines how much data storage space (in bytes) can be guaranteed at a given time. If you do not know the data rate, you cannot correctly calculate how much space you need to store data for a given time span. The data rate also identifies the minimum performance that a single consumer needs to support without delay.
  • Unless you have other architectural needs, use random partitioning when writing topics. When you do large operations, the variability in data rates between partitions can be very difficult to manage. The reasons come from the following three aspects:
    • First, a consumer on a “hot” (high-throughput) partition is bound to process more messages than any other consumer in the group, which is likely to cause bottlenecks both on processing and on the network.
    • Second, the maximum reserved space configured for the partitions with the highest data rates results in a corresponding increase in disk usage for the other partitions in the topic.
    • Third, the optimal balancing scheme based on the partitioned leader relationships is more complex than simply spreading the leader relationships across all brokers. A “hot” partition “carries” 10 times the weight of other partitions in the same topic.

For topic partition usage, You can refer to Kafka Topic Partition of various effective strategies “(https://blog.newrelic.com/engineering/effective-strategies-kafka-topic-partitioning/), to learn more.

Best practices for Consumers

  • If consumers are running versions older than Kafka 0.10, upgrade immediately. In version 0.8.x, consumers use Apache ZooKeeper to coordinate with the Consumer Group, and many known bugs cause it to be permanently in a rebalancing state or directly cause the rebalancing algorithm to fail (what we call “rebalancing storms”). Thus, during rebalancing, one or more partitions will be assigned to each consumer in the same group. In a rebalancing storm, ownership of partitions continues to move from one consumer to another, preventing any consumer from actually taking ownership of partitions.
  • Tune socket buffers for consumer to handle high-speed data inflows. In the 0.10.x version of Kafka, the receive.buffer.bytes parameter defaults to 64 kB. In Kafka’s 0.8 x version, the parameters of the socket. The receive. Buffer. The default value is 100 kB of bytes. Both defaults are too small for high-throughput environments, especially if the bandwidth-delay product between broker and consumer is greater than the Local Area Network (LAN). For high-bandwidth networks with latency of 1 millisecond or more (such as 10 Gbps or more), consider setting the socket buffer to 8 or 16 MB. If you are low on memory, at least consider setting it to 1 MB. Of course, you can also set it to -1, which will let the underlying operating system adjust the buffer size based on network conditions. However, automatic tuning may not be as fast for consumers who need to start “hot” partitions.
  • Design consumers with high throughput to implement back-pressure on demand. In general, we should ensure that the system processes only as much data as it can handle, and not overconsume, resulting in process interruptions, “hangs”, or consume group overflows. If running in a Java Virtual Machine (JVM), consumers should use fixed-size buffers (see Disruptor pattern: http://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf), and it is best to use outside the heap memory (off – heap). A fixed-size buffer prevents the consumer from pulling so much data onto the stack that the JVM spends all of its time doing garbage collection, failing to do its essential job of processing messages.
  • When running various consumers on the JVM, be aware of the impact garbage collection can have on them. For example, a long holdup in garbage collection could result in a ZooKeeper session being dropped or a Consumer group being rebalanced. For brokers, too, there is a risk that the cluster will go offline if garbage collection is held up for too long.

Best practices for Producers

  • Configure producer to wait for various confirmations. The producer can then determine whether the message is actually being sent to the broker’s partition. On version 0.10.x of Kafka, the setting is acks; On version 0.8.x, request. Required. Acks. Kafka provides fault tolerance through replication, so the failure of a single node or a change in the leader relationship of a partition does not affect the availability of the system. If you don’t use acks to configure producer (or “fire and forget”), information can be lost.
  • Configure retries for each producer. The default value is 3, which is of course very low. However, the correct setting depends on your application, that is: for applications with zero tolerance for data loss, consider integer.max_value (valid and maximum). This will allow for situations where the broker’s leader partition cannot immediately respond to produce requests.
  • Tune buffer sizes, especially buffer.memory and batch.size (in bytes), for high-throughput producer. Since batch.size is set by partition, producer performance and memory usage can be related to the number of partitions in the topic. Therefore, the setting here will depend on the producer’s data rate (size and number of messages), the number of partitions to be generated, and the amount of memory available. Keep in mind that making the buffer bigger is not always a good thing, and if the producer fails for some reason (for example, if a leader is slower to respond than acknowledge), the more data that is buffered in the on-heap, the more garbage it needs to recycle.
  • Inspect the application to track metrics such as the number of messages generated, the average message size, and the number of messages consumed.

Best practices for Brokers

  • At each brokers, compress the memory and CPU resources required for Topics. Each broker need log compression (see https://kafka.apache.org/documentation/#compaction) on the stack (memory) and CPU cycles can successfully cooperate to achieve. Allowing failed log compression data to continue to grow puts the brokers partition at risk. You can adjust the log on the broker. The cleaner. Dedupe. Buffer. The size and the cleaner. The threads of these two parameters, but remember, these two values will affect the various brokers on the stack is used. If a broker throws an OutOfMemoryError, it will be closed and may result in data loss. The size of the buffer and the count of threads depend on the number of Topic partitions that need to be purged, as well as the data rate and key size of the messages in those partitions. For Version 0.10.2.1 of Kafka, monitoring a logger’s log file with an ERROR entry is the most reliable way to detect possible problems with its threads.
  • Monitor brokers by network throughput. Monitor transmit (TX) and Receive (RX) traffic, as well as disk I/O, disk space, and CPU usage. Capacity planning is critical to maintaining overall cluster performance.
  • Assign the leader relationship for the partition between the brokers of the cluster. The Leader typically requires a large amount of network I/O resources. For example, when replication factor is set to 3 and up and running, the leader must first get the data for the partition and then send two sets of copies to the other two followers and then to the many consumers that need the data. So in this example, the single leader uses at least four times as much network I/O as the followers. Furthermore, the leader may need to read the disk, while the follower only needs to write.
  • Don’t ignore monitoring brokers’ In-sync Replicas (ISR) shrinks, Under-replicated partitions, and unpreferred leaders. These are all signs of potential problems in the cluster. For example, frequent ISR shrinkage of a single partition indicates that the data rate of the partition is too high for the leader to serve consumer and other replica threads.
  • According to the need to modify the Apache Log4j various attributes (https://github.com/apache/kafka/blob/trunk/config/log4j.properties). Kafka broker logging can consume a lot of disk space, but we can’t turn it off completely. Because there are times when the sequence of events needs to be reconstructed after an accident, broker logging can be our best, or even only, method.
  • Disable automatic creation of topics or create a cleanup policy for topics that are not being used. For example, if there are no new messages within the specified x days, you should consider whether the topic is invalid and remove it from the cluster. This prevents you from spending time managing additional metadata created in the cluster.
  • For brokers with consistently high throughput, provide enough memory to prevent them from making reads from the disk subsystem. As much as possible, we should fetch the partitioned data directly from the operating system cache. However, this means that you have to make sure that your consumers keep up with the “rhythm”, and that the delayed consumers force the broker to read from disk.
  • For large clusters with high throughput service level objectives (SLOs), consider isolating different topics for a subset of brokers. How you decide which topics you need to quarantine is entirely up to your business needs. For example, if you have multiple Online Transaction Processing (OLTP) systems that use the same cluster, isolating topics from each system into different brokers subsets can help limit the radius of impact of potential events.
  • Use the new Topic message format on the old client. Additional format conversion services should be loaded on the various brokers instead of the client. Of course, it’s best to avoid this as much as possible.
  • Don’t make the mistake of thinking that testing the broker well on a localhost is representative of real performance in a production environment. Keep in mind that using a replicator of 1 and testing partitions on the loopback interface is very different from most production environments. The network latency on the loopback interface can be almost ignored, and the time required to receive the leader’s confirmation can also vary greatly if replication is not involved.

Other resources

Hopefully, these suggestions will help you use Kafka more effectively. If you want to improve your expertise in Kafka, check out the “Operations” section of the Kafka companion documentation, which contains practical information on operating clusters and so on. In addition, Confluent (https://www.confluent.io/) will be held on a regular basis and publish all kinds of online discussion, to help you better understand Kafka.

20 Best Practices for Working With Apache Kafka at Scale by Tony Mancill

Original: http://os.51cto.com/art/201808/582379.htm