Apache Kafka production environment cluster resource planning and configuration

For more selected articles, you can search cicada alley on wechat, pay attention to the public number and reply to information two words, there are big data learning materials and videos.

Kafka cluster resource planning

You can refer to the deployment recommendations of ConFluent:

Docs. Confluent. IO/current/kaf…

The operating system

Kafka source code includes both Scala and Java source files, which belong to the BIG data framework of the JVM system. Java is a cross-platform language, and the source code can be compiled to run on JVMS for different operating systems. However, Kafka runs somewhat differently on different operating systems, and the difference between operating systems can have a considerable impact on Kafka clusters.

The largest number of Kafka clusters deployed in actual production is still Linux Server. The following three factors are generally considered:

  1. I/O model support provided by the underlying operating system
  2. The efficiency of data transfer over the network
  3. Community activity and support

The mainstream IO models are as follows:

  • Blocking I/O (Bloking I/O)

Synchronous blocking: A thread that makes an I/O call is blocked, transferred to kernel space for processing, and returns data after the entire I/O process is complete. Advantages and disadvantages: Generally, one I/O thread is allocated to each I/O request, resulting in high system overhead. Typical applications: blocking Socket, Java BIO

  • Non-blocking I/O (I/O)

Synchronous non-blocking: the thread keeps polling to read the kernel IO device buffer and returns EWOULDBLOCK immediately if there is no data, or if there is. Advantages and disadvantages: more CPU consumption, more invalid IO. Typical application: Non-blocking Socket (set to NONBLOCK)

  • Multiplexing I/O Multiplexing I/O

The thread calls SELECT /poll/epoll to pass in multiple device FDS and then blocks or polls to wait. Returns a readable condition if an I/O device is ready, and the user thread actively invokes I/O reads and writes, or continues to block if not. Pros and cons: Compared to the first two models, I/O multiplexing can listen to multiple I/O devices, so multiple network requests can be processed in a single thread. Typical applications: select, poll, and epoll. Nginx and Java NIO are based on this I/O model.

  • Signal-driven I/O

Using Linux signal mechanism, sigaction function will SIGIO read and write signal and handler callback function stored in the kernel queue. A SIGIO interrupt is triggered when the device I/O buffer is writable or readable, returning the device FD and calling back to the handler.

  • Asynchronous I/O (Asynchronous I/O)

There are two main implementations of Linux AIO: Glibc AIO and Libaio/kernel AIO. Windows provides a thread model called IOCP for asynchronous I/O.

Kafka’s relationship to the I/O model

Kafka Selector KafkaServer, KafkaProducer, KafkaConsumer directly use Kafka’s own Selector, the underlying use of Java NIO channels API, of course, also includes: private final java.nio.channels.Selector nioSelector; .

Kafka realize the Selector: Kafka/clients/SRC/main/Java/org/apache/Kafka/common/network/Selector. Java

// ...
import java.nio.channels.CancelledKeyException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnresolvedAddressException;

// A nioSelector interface for doing non-blocking multi-connection network I/O.
public class Selector implements Selectable.AutoCloseable {
  // ...
}
Copy the code

Java NIO Selector is implemented on Linux as ePoll and on Windows as Select. There are advantages to deploying Kafka on Linux Servers at this point for more efficient I/O performance.

Network transmission efficiency

KafkaProducer sends messages to the Broker and KafkaConsumer consumes messages from the Broker over the network. However, the messages persist to disk, so Kafka needs to perform a lot of data transfer between disk and network. When data is transferred between disks and networks, avoid copying data in the operating system kernel mode to achieve fast data transfer. Zero copy mechanism is implemented in Linux platform and similar mechanism is implemented in Windows PLATFORM JDK8. The current JDK version is not a problem.

With zero copy, the first thing that comes to mind is what is the copy that is not zero or greater than zero, where does it happen, where does it go from?

Non zero copy

The application process initiates a read request. After receiving the read request, the kernel checks whether the data required by the process already exists in the kernel space. If so, the data is directly copied to the process buffer. If not, the kernel then issues commands to the disk controller to read data from disk, and the disk controller writes the data directly to the kernel’s read buffer, via DMA. Next, the kernel copies the data into the application process’s buffer. If the application process initiates a write request, it also needs to copy the data from the user buffer to the socket buffer of the kernel, and then copy the data to the nic through DMA and send it out.

The point above is that the kernel space data needs to be copied to user space each time. Four places: kernel buffer, process i.e. user buffer, disk, and network. When data is read from disk to network, disk -> kernel buffer (READ)-> user buffer -> kernel buffer (socket)-> network; It would be nice to be able to transfer data directly from the kernel buffer (read) to the kernel buffer (socket), leaving data transfer only in the kernel space; If the kernel buffer caches data memory addresses and offsets, even a single CPU copy in the kernel space is eliminated and becomes a zero copy.

Zero copy in Java:

  • MappedByteBuffer Virtual memory mapping
  • DirectByteBuffer inherits from MappedByteBuffer
  • Sendfile Channel – to – Channel transmission

In Linux, two modes are used: 1. Mmap + Write 2. Kafka uses FileChannel#transferTo as a writeTo method in FileRecords, and sendfile as a base. However, MappedByteBuffer is also used in transferTo and transferFrom method implementations.

Community support is always based on Linux system, Windows platform bugs are hardly maintained

Of course, the production environment is Linux Server.

disk

Disk planning is very important for Kafka, and typically involves choosing between a mechanical disk and an SSD.

  • Mechanical disk: low cost and large capacity, but easy to damage
  • SSD: Performance advantage, but high unit price

Because of the sequential Kafka Log write and read nature, it is possible to use a normal mechanical hard disk.

Kafka uses a lot of disks, but it uses sequential reads and writes to get around the biggest disadvantage of mechanical disks, which is slow random reads and writes. At this point, SSDS don’t seem to offer much of a performance advantage. After all, mechanical disks are cheap in terms of cost, and Kafka provides mechanisms at the software level (multiple copies) to ensure poor reliability due to their vulnerability.

Advantages of disk arrays (RAID) :

  • Provides redundant disk storage space
  • Provide load balancing

RAID has two advantages that are attractive to any distributed system. However, Kafka implements a multi-copy mechanism to improve reliability; By partitioning data for storage (read/write), load balancing is also achieved at the software level.

  • You can use common disks instead of RAID
  • Mechanical disks are fully capable of producing Kafka environments

Disk capacity

How much storage can a Kafka cluster expect to require?

Kafka requires messages to be stored on the underlying disk, which by default are saved for a period of time and then deleted automatically. Although this period can be configured, you still need to plan the storage capacity of the Kafka cluster based on service scenarios and storage requirements.

Suppose 100 million messages are sent to the Kafka cluster every day. To prevent message loss, two copies of each message are stored for two weeks by default, after which they are automatically deleted. If the size of a message is 1KB, 100 million 1KB messages are sent every day and two copies are saved for two weeks, then the total space is equal to 100 million * 1KB * 2/1000/1000 = 200GB. 10% of the disk space is reserved for storing other types of data (such as index data, etc.). Let’s say the total storage capacity is 220GB. For two weeks, the total storage capacity is 220GB * 14, or about 3TB. But we can compress the message, assuming a compression ratio of 75%, 0.75 * 3 = 2.25TB.

For storage, the following factors are considered:

  1. Number of new messages added (incremental)
  2. Message retention time
  3. Average message size
  4. Number of copies, such as 2 to 3
  5. Whether to enable compression

Network bandwidth

Objectively, the network is a bottleneck for Kafka because a large amount of data is transferred over the network, whether it is the producer sending messages to the Broker, the consumer consuming messages from the Broker, or the Follower replica synchronizing messages from the Leader replica (typically an Intranet).

Ordinary Ethernet network, bandwidth mainly has two kinds: 1Gbps gigabit network and 10Gbps 10 gigabit network, especially gigabit network should be the standard configuration of general corporate network.

1 Gbps = 1,000 Mbps, lowercase B. The upper case B represents Byte, and the lower case B represents bit. 1 Byte equals 8 bits.

The bandwidth is 1Gbps, or 1Gb of data per second, assuming that each Kafka server is installed on its own machine. This means that no other services are mixed on each Kafka machine, and this is not recommended in the real world. Assume that Kafka uses 70% of the bandwidth resources, because there is always some resource left over for other applications or processes. Therefore, a single Kafka server can use approximately 700Mb of bandwidth resources at most. Since it is the maximum, it may actually use only a third of the bandwidth resources, 700Mb / 3 ≈ 240Mbps. So how many Kafka machines does it take to process a terabyte of data in an hour? One TB per hour is 2336Mb of data per second (10241024/36008), 2336Mb / 240Mbps ≈ 10, which is equal to about 10 servers, plus copies, plus 3 times, the final need for about 30 Kafka nodes.

Kafka cluster parameters

The Broker side parameter

server.properties

  • Broker. Id: The broker ID for this server. If unset, a unique broker ID will be generated
  • Log.dirs: specifies several file directory paths that the Broker needs to use and must be specified manually
  • Log. dir: represents only a single path and is used to supplement the log.dirs parameter.
  • zookeeper.connect
  • listeners
  • advertised.listeners
  • auto.create.topics.enable
  • unclean.leader.election.enable
  • auto.leader.rebalance.enable
  • log.retention.{hour|minutes|ms}
  • log.retention.bytes
  • message.max.bytes

broker.id: To avoid conflicts between zookeeper generated broker id’s and user configured broker id’s, generated broker ids start from reserved.broker.max.id + 1.

Type: int

Default: -1

Valid Values:

Importance: high

Update Mode: read-only

Just set log.dirs, you don’t need to set log.dir.

# A comma separated list of directories under which to store log files
Do not place it in/TMP directory...
log.dirs=/tmp/kafka-logs
Copy the code

log.dirs: The directories in which the log data is kept. If not set, the value in log.dir is used

Type: string

Default: null

Valid Values:

Importance: high

Update Mode: read-only

log.dir

log.dir: The directory in which the log data is kept (supplemental for log.dirs property)

Type: string

Default: /tmp/kafka-logs

Valid Values:

Importance: high

Update Mode: read-only

Production environment must be for the dirs configure multiple paths, specific format is a commas CSV format, such as/XXXX/kafka1, / XXXX/kafka2, XXXX/kafka3. If possible, ensure that these directories are mounted on different physical disks. This has two advantages:

  • Improved read/write performance: The read/write throughput of multiple physical disks is higher than that of a single disk.
  • Capable of Failover: Failover. After Kafka 1.1, if a disk fails, a copy is automatically rebuilt on another normal disk and the Broker still works. This is the basis for Kafka’s ability to move away from disk arrays (RAID).

Parameters related to Zookeeper

zookeeper.connect

zookeeper.connect: Specifies the ZooKeeper connection string in the form hostname:port where host and port are the host and port of a ZooKeeper server.

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
# hostname1:port1,hostname2:port2,hostname3:port3
zookeeper.connect=localhost:2181

# Timeout in ms for connecting to zookeeper
zookeeper.connection.timeout.ms=6000
Copy the code

General configuration: hostname1: port1, hostname2: port2, hostname3: port3 for multiple Kafka cluster use the same set of ZooKeeper cluster:

  • kafka1

hostname1:port1,hostname2:port2,hostname3:port3/chroot/kafka1

  • kafka2

hostname1:port1,hostname2:port2,hostname3:port3/chroot/kafka2

Parameters associated with the Broker connection

# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
For Intranet access, you can set this parameter to an Intranet IP address
#listeners=PLAINTEXT://:9092

# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
This IP address can be set to an external IP address
#advertised.listeners=PLAINTEXT://your.host.name:9092
Copy the code

PLAINTEXT: indicates PLAINTEXT transmission, SSL indicates encrypted transmission using SSL or TLS, etc. Can also be defined protocol name, such as the CONTROLLER: / / localhost: 9092

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
Copy the code

It is better to use all the host names, that is, all the host names in the configuration of the Broker and Client applications.

Parameters about Topic management

  • Auto. Create. Switchable viewer. Enable: whether to allow automatically create Topic.
  • Unclean. Leader. Election. Enable: whether to allow unclean leader election.
  • Auto. Leader. Rebalance. Enable: whether to allow regular leader election.

It is best not to create topics automatically in production environments; In addition, there should be a special application review mechanism for the topic to be used, which should be strictly controlled by operation and maintenance. It is not allowed to create any topic by itself.

In the Kafka Leader election, not all replicas can be elected as the Leader. Only replicas with more data can be elected as the Leader. What if the more data copies all hang up? If the unclean.. If the parameter is false, you must not allow replicas that are too far behind to run for Leader. If unclean parameter.. If true, Kafka allows one of the slower replicas to be the Leader. The consequence of this is that the data can be lost because the copies don’t hold all the data in the first place,

Auto. Leader. Rebalance the enable if set to true, will allow the Kafka on a regular basis for some Topic partition leader election. Instead of choosing the Leader, you choose a new Leader. Changing the Leader is costly and the request is switched to the new Leader, so it is recommended to set it to false.

Parameters for data retention

  • The log. The retention. {hour | minutes | ms} : control how long it will take a message data is saved. In terms of priority, MS is the highest, minutes is the second, and Hour is the lowest.
  • Log.retention. Bytes: This is the total disk capacity that the specified Broker holds for messages.
  • Message.max. bytes: Controls the maximum size of messages that the Broker can receive.
# The minimum age of a log file to be eligible for deletion due to age
168 indicates that the data generated 7 days ago is automatically deleted
# Many companies use Kafka as storage, so this value should be increased accordingly
log.retention.hours=168

#log.retention.bytes=1073741824
# the default 1000012
The production environment needs to set a larger value based on the actual situation
# The largest record batch size allowed by Kafka.
message.max.bytes
Copy the code

Topic-level parameters

Topic-level parameters override the values of global Broker parameters, and each Topic can set its own parameter values, which are known as topic-level parameters.

As for the setting of data retention time, in actual production, topics of different departments can be allowed to set retention time suitable for their own topics according to their own business needs.

retention.ms: This configuration controls the maximum time we will retain a log before we will discard old log segments to free up space if we are using the “delete” retention policy. This represents an SLA on how soon consumers must read their data. If set to -1, no time limit is applied.

Type: long Default: 604800000 Valid Values: [-1,…] Server Default Property: log.retention.ms Importance: medium

  • Retention. Ms: Specifies how long Topic messages are held. The default value is 7 days. That is, the Topic stores only the messages of the last 7 days. Once set, it overrides the global parameter values on the Broker side.
  • Retention. Bytes: Specifies how much disk space to reserve for this Topic. Like a global parameter, this value is usually useful in a multi-tenant Kafka cluster. The current default value is -1, which indicates unlimited disk space.

Topic level parameter Settings:

  1. Set this up when you create a Topic
  2. Modify Topic Settings

Create a Topic:

bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic transaction --partitions 1 --replication-factor 1  --config retention.ms=15552000000 --config max.message.bytes=5242880Copy the code

Use kafka-configs to modify topic-level parameters:

bin/kafka-configs.sh --zookeeper localhost:2181 --entity-type topics--entity-name transaction --alter --add-config max.message.bytes=10485760
Copy the code

The JVM parameter

Kafka has officially dropped Support for Java 7 since version 2.0.0, so at the very least it uses Java 8. On the JVM side, the heap size parameter is critical.

The JVM heap size is set to 6GB, which is considered a reasonable value in the industry. The Java8 GC is manually specified as the G1 collector. Set two environment variables:

  • KAFKA_HEAP_OPTS: specifies the heap size.
  • KAFKA_JVM_PERFORMANCE_OPTS: Specifies the GC parameters.
$ export KAFKA_HEAP_OPTS=--Xms6g --Xmx6g
$ export KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$ bin/kafka-server-start.sh config/server.properties
Copy the code

Operating System Parameters

OS general concerns:

  • File descriptor restrictions
  • File System Type
  • Swappiness
  • Submit time

“Too many open files”

Most importantly, ulimit-n is a value that should be adjusted for every Java application and every Java project.

$ ulimit -n
256
Copy the code

Select a file system type

According to test reports on the official website, XFS is better than ext4, so it’s best to use XFS in a production environment, or try ZFS.

Swap tuning

Swap can be set to a smaller value instead of 0. Because once set to 0, when physical memory runs out, the OS triggers OOM Killer, which randomly selects a process and kills it, without giving the user any warning at all. However, if set to a small value, we can at least observe the beginning of a sharp decline in Broker performance when we start using swap space, giving us time to further tune and diagnose problems. For this reason, it is recommended that swappniess be configured with a value close to but not zero, such as 1.

Commit time or Flush drop time

Sending data to Kafka is not considered a success until the data is actually written to disk, but rather as long as the data is written to the operating system’s Page Cache, which the operating system then periodically dumps “dirty” data from the Page Cache onto the physical disk using the LRU algorithm. This interval is determined by the commit time, which is 5 seconds by default. This is often considered too frequent, and you can increase the commit interval appropriately to reduce physical disk writes. Of course, the general question is: if the data in the page cache goes down before it can be written to disk, the data is lost. Yes, data is lost in this case, but given that Kafka already provides multiple copy redundancy at the software level, it makes sense to increase the commit interval slightly in exchange for performance.

Page Cache flush disk policy

  1. The page cache becomes too full, but more pages are needed, or the number of dirty pages is already too high.
  2. Too much time has passed since the page became dirty.
  3. The user process is triggered by calling sync(), fsync(), or fdatasync() system mobilization.

The first two items can be adjusted by setting system parameters. Using sysctl -a | grep dirty command to view the default configuration:

# sysctl -a | grep dirty 
vm.dirty_background_bytes = 0
vm.dirty_background_ratio = 10
vm.dirty_bytes = 0
vm.dirty_expire_centisecs = 3000
vm.dirty_ratio = 20
# 5 seconds is here
Turning this parameter down can increase the frequency of disk flushing and flush dirty data to disk as quickly as possible. But be sure to ensure that the interval can be completed within the data brush disk.
vm.dirty_writeback_centisecs = 500
Copy the code

Page Cache viewing tool

git clone --depth 1 https://github.com/brendangregg/perf-tools
Copy the code