• 👏 Author profile: Hello, everyone, I love to knock code Xiao Huang, the Unicorn enterprise Java development engineer, Java field new star creator.
  • 📝 personal public number: love to knock code of small huang
  • 📕 series of columns: Java design patterns, data structures and algorithms, Kafka from starter to starter
  • 📧 If the article knowledge point has the wrong place, please correct! Learn with you and make progress 👀
  • 🔥 if you feel that the blogger’s article is good, please 👍 three even support 👍 a blogger oh
  • 🍂 bloggers are working hard to complete project 2022: dream as a horse, set sail, 2022 Dream chasers

Kafka online cluster deployment

1. Operating system

Generally speaking, we have three main operating systems: Linux, Windows and macOS

Our Kafka is typically deployed on Linux, which is superior in three major ways

  • Use of the I/O model
  • Data network transmission efficiency
  • Community support

There are usually five types of I/O models: blocking I/O, non-blocking I/O, I/O multiplexing, signal-driven I/O, and asynchronous I/O.

Each I/O model has its own usage scenarios. The blocking mode and non-blocking mode of Socket objects in Java correspond to the first two models. The system call select function in Linux belongs to the I/O multiplexing model. Epoll system calls fall between the third and fourth models; As for the fifth model, Window provides an IOCP thread model.

The underlying Kafka client uses Java selector, which is implemented by Epoll in Linux and select in Windows. There are advantages to deploying Kafka on Linux for more efficient I/O performance.

Network transmission gap. Kafka producer and consumer messages are transmitted over the network and stored on disk. As a result, Kafka requires a lot of data transfer across disks and networks. The transmission efficiency is greatly improved by Zero Copy technology on Linux platform.

Zero copy: Fast data transfer by avoiding expensive kernel-mode data copies when moving between disk and network.

The community currently does not change the Kafka Bug for Windows. Deploying Kafka on Windows should only be used for personal testing or functional verification and should never be used in a production environment.

2. The disk

For Kafka, the use of mechanical hard drives is sufficient to support our daily business. Kafka takes advantage of sequential reads and writes, somewhat circumventing the biggest disadvantage of mechanical hard drives: slow random reads and writes.

Also, about the use of disk arrays (RAID). The main advantages of using RAID are:

  • Provides redundant disk storage space
  • Provide load balancing

Kafka implements its own redundancy mechanism to improve reliability. Through the concept of partition, load balancing is achieved.

3. Disk capacity

Say a business needs to write 100 million messages a day to Kafka. Two copies of each message are saved to prevent data loss. Messages are saved for two weeks by default. The size of a message is 1KB.

Total space = 100 million x 2 x 1KB / 1024/1024 = 190 GB

We have some index information in Kafka in addition to messages, and we leave 10% of the space.

Total space size in 14 days = **190 gb / 90% * 14 = 2966GB **

Kafka supports data compression. Assuming a compression ratio of 0.75, the final disk size is estimated to be 2966GB x 0.75 = 2225GB

Consider the following when planning disk capacity:

  • Number of new messages
  • Message retention time
  • Average message size
  • Number of backup
  • Whether to enable compression

4. The bandwidth

Bandwidth is mainly divided into two types: 1Gbps gigabit network and 10Gbps 10 gigabit network.

Bandwidth is 1Gbps, or 1Gb of data per second. The transmission speed is 1000 megabits per second.

If our business is currently processing 1TB of business data per hour

Since our Kafka is running on a machine with other services, let’s say that Kafka uses 70% of the bandwidth. This is equivalent to one hour of processing: 1Gbps x 70% = 700Mbps

This is our ideal maximum bandwidth. We cannot allow Kafka servers to use this much resources, so we often set aside 2/3 of the resources, which is equivalent to 700Mbps /3 = 240Mbps for each

We process a terabyte of data an hour. The number of servers in use is as follows: 1024 x 1024/3600 = 291MB, that is, the data processed per second is 291MB

1Mbps means that 1,000,000 bits (bits) are transmitted per second, that is, 1,000,000 /8= 125,000 bytes /s.

We have a bandwidth of 240Mbps.

Final number of machines: 290 * 8/240 = 10

If we need to make two copies of our data, then our final server: 10 * 2 + 10 = 30

factors Considerations point advice
The operating system Operating system I/O model Deploy Kafka on a Linux system
disk Disk I/O performance In common environments, mechanical hard disks are used and do not need to be installedRAID
Disk capacity Estimated disk capacity based on the number of messages and retention time You are advised to reserve 20% to 30% of the disk space
bandwidth Estimate the number of servers based on actual bandwidth resources and services For gigabit networks, it is recommended that each server calculate at 700Mbps to prevent packet loss

Configure Kafka cluster parameters

1. Broker parameters

The Broker needs to configure which disks it uses to store information.

The first set of parameters is for storage:

  • Log.dirs: specifies several file directory paths that the Broker needs to use.
  • Log. dir: indicates a single path, complementing the above parameters

General Settings: the dirs = / home/kafka1, / home/kafka2, / home/kafka3

The benefits of this setup:

  • Improved read/write performance: Multiple disks provide higher throughput
  • Implement Failover: Failover, a powerful feature introduced in Kafka 1.1. In the past, the failure of any disk used by the Broker caused the entire Broker process to shut down. As of 1.1, the data from our failed disk is automatically transferred to other normal disks, and the Broker is still working. ** Our Broker automatically recreates the copy on a good path, and the Leader synchronizes. Kafka supports the ability to copy data from one path to another. ** This is why we don’t use RAID in Kafka.

The second set of parameters is the configuration for Zookeeper:

Zookeeper coordinates, manages, and stores all metadata information in Kafka clusters. For example: the machine of the Broker, Topic, Partition, Leader copy, etc

  • Zookeeper.connect: General situation:zk1:2181,zk2:2181,zk3:2181. If multiple Kafka clusters use the same Zookeeper cluster, set this parameter to:zk1:2181,zk2:2181,zk3:2181/kafka1zk1:2181,zk2:2181,zk3:2181/kafka2

** The third set of parameters is related to Broker connections, ** how clients or other brokers communicate with this Broker:

  • Listeners. What protocol is used to tell external connectors to access Kafka services with host names and ports open
  • 12. Advertised b indicates that this group of listeners is advertised by the Broker. Generally used for extranet

Listeners: Comma-separated triples of the format < protocol name, host number, port number >. Protocol names such as PLAINTEXT for PLAINTEXT transmission, SSL for transmission using SSL encryption or TLS encryption, etc. Host ID Generally uses the host name.

The fourth set of parameters is about Topic management

  • Auto. Create. Switchable viewer. Enable: whether to allow automatically create Topic
  • Auto. Leader. Election. Enable: whether to allow Unclean leader election
  • Auto leader, rebalance. Enable: whether to allow regular leader election

The first argument: we do not recommend allows to automatically create Topic, so will auto. Create. Switchable viewer. Enable = false

The second parameter: Disable the Unclean Leader election. For Unclean, Kafka has a Leader and Follower replica mechanism. Our Follower replica will not be allowed to run for the Leader if it is too far behind the Leader. When we set to auto. Leader. Election. Enable = false, even if we all hang other copies, we will not let the copy behind too many followers campaign leader, and vice versa. Normally we would set it to true

The third argument: if we set to auto. The leader, rebalance. Enable = true, we allow our Kafka regularly for some Topic partition leader re-election. If set to true, even if our LeaderA is fine, we will replace it, which will waste a lot of resources. We directly recommend setting this parameter to false

The fifth set of parameters is data retention

  • The log. The retention. {hour | minutes | ms} : control of a message to save much time
  • Log.retention. Bytes: Specifies the size of the disk capacity that the Broker holds for messages
  • Message.max. bytes: Controls the maximum number of messages that the Broker can receive

The first parameter: we typically set log.retention. Hour = 168 for 7 days of data.

Second parameter: The default is -1, indicating that you can save as much data as you want to the Broker. It is mainly used by cloud tenants to control users’ disk space.

The third parameter: it is recommended to set it to a larger value. It does no harm. If it is set to a smaller value, messages may be lost.

2. Set Topic parameters

When we set topic-level parameters and global Broker parameters, our topic-level parameters override the values of the global Broker parameters.

In our daily business, different departments use different topics, and it is not possible to divide the same retention time for all departments. Setting topic-level parameters to override the Broker is a good option at this point.

From the saved message:

  • Retention. Ms: Specifies how long Topic messages are held. The default is seven days
  • Retrnyion. Bytes: Specifies how much disk space is reserved for this Topic.

From the processing message size:

  • Max. message,bytes: Maximum message size for a Topic

Topic level parameter Settings:

  • Set this up when you create a Topic
  • Modify Topic Settings

The first way: bin/kafka-topics.sh–bootstrap-serverlocalhost:9092–create–topictransaction–partitions1–replication-factor1–configr etention.ms=15552000000–configmax.message.bytes=5242880

The second way: bin/kafka-configs.sh–zookeeperlocalhost:2181–entity-typetopics–entity-nametransaction–alter–add-configmax.message. bytes=10485760

The second option is generally recommended. In the future, the Kafka community may use the Kafka-configs script to tune Topic parameters.

3. The JVM parameter

Kafka is typically placed in a Java 8 environment

On the JVM side, heap size is extremely important. Our industry typically sets the JVM heap size to 6GB.

Garbage collector options:

  • If the Broker machine is CPU rich, the CMS collector is recommended. Usage: XX:+UseCurrentMarkSweepGC
  • Otherwise, use throughput collector:-XX:+UseParallelGC

If you already use Java 8, use the default G1 collector.

Set parameters for Kafka:

  • KAFKA_HEAP_OPTS: specifies the heap size
  • KAFKA_JVM_PERFORMANCE_OPTS: Specifies the GC parameters

Examples are as follows:

$> export KAFKA_HEAP_OPTS=--Xms6g  --Xmx6g
$> export  KAFKA_JVM_PERFORMANCE_OPTS= -server -XX:+UseG1GC -XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35 -XX:+ExplicitGCInvokesConcurrent -Djava.awt.headless=true
$> bin/kafka-server-start.sh config/server.properties
Copy the code

4. Operating system parameters

Generally, we will pay attention to the following:

  • File descriptor restrictions
  • File System Type
  • Swappiness
  • Submit time

The first is ulimit-n. This value has no special meaning, but if you set it Too small, you will often get Too many open files errors

The second is the choice of file system, generally we have: EXT3, EXT4, XFS, ZFS and so on. ZFS > XFS > EXT4 > EXT3

Then there is the tuning of swap. First, to explain what Swappiness means: **Swappiness is a property of the Linux kernel that changes the balance between memory when swapping routes, rather than removing pages from the system page cache. When swappiness=0, it means the maximum use of physical memory, followed by swap space. When swappiness= 100, it means the active use of swap partition, and the timely transfer of memory data to swap space. It mainly switches from memory to disk, which increases system I/O and seriously affects system performance. ** We typically set this parameter to 1 and set alarms to further tune and diagnose problems as our Broker swaps memory to disk. If we set it to 0, OOM Killer will trigger a random process and kill it when it runs out of physical memory.

Finally, there’s the Flush drop time. When a message is written to Kafka, Kafka does not write directly to disk. Instead, Kafka writes to the operating system’s Page Cache. The operating system then periodically dumps dirty data from the Page Cache onto the physical disk using the LRU algorithm. This periodic time is set by ourselves and the default is 5 seconds.