Kafka applications
This article mainly summarizes what capabilities we need to ensure high availability, high reliability, high performance, high throughput, and safe operation of Kafka clusters when the traffic reaches trillionth records/day or ten trillionth records/day or higher.
This summary mainly focuses on Kafka2.1.1, including cluster upgrade, data migration, traffic limiting, monitoring alarms, load balancing, cluster expansion/reduction, resource isolation, cluster disaster recovery, cluster security, performance optimization, platformization, open source version defects, community dynamics, and so on. This paper mainly introduces the core context, not too much detail. Let’s take a look at some of the core applications of Kafka as a data hub.
The figure below illustrates some of the mainstream data processing processes, with Kafka acting as a data hub.
Let’s look at the overall Kafka platform architecture;
1.1 Version Upgrade
1.1.1 How do I Roll Back the Open Source Version
Kafka.apache.org
1.1.1.2 Source code Transformation How to upgrade and rollback
Because in the upgrade process, it is inevitable that old and new code logic will be replaced. Some nodes in the cluster are the open source version, while others are the modified version. Therefore, you need to consider how old and new code will mix during the upgrade process, how it will be compatible, and how it will roll back in the event of a failure.
1.2 Data Migration
Due to the architectural characteristics of Kafka cluster, this inevitably leads to unbalanced traffic load in the cluster, so we need to do some data migration to achieve traffic balance between different nodes in the cluster. The Kafka open source version provides a script called bin/ kafka-reassigns -partitions. Sh for data migration, which you can use if you don’t have automatic load balancing.
The script provided in the open source version generates a migration plan that is completely manual, and when the cluster size is very large, the migration becomes very inefficient, usually measured in days. Of course, we can achieve a set of automatic balancer, when the load balancing automation, the basic use of internal call API, by the program to help us to generate migration plan and perform migration tasks. Note that migration plans have two types: specified data directories and no data directories. ACL authentication must be configured for specified data directories.
Kafka.apache.org
1.2.1 Data migration between brokers
No data directory is specified
// The migration plan for the migration directory is not specified
{
"version":1."partitions":[
{"topic":"yyj4"."partition":0."replicas": [1000003.1000004] {},"topic":"yyj4"."partition":1."replicas": [1000003.1000004] {},"topic":"yyj4"."partition":2."replicas": [1000003.1000004]]}}Copy the code
Specify data directory
// Specify a migration plan for the migration directory
{
"version":1."partitions":[
{"topic":"yyj1"."partition":0."replicas": [1000006.1000005]."log_dirs": ["/data1/bigdata/mydata1"."/data1/bigdata/mydata3"] {},"topic":"yyj1"."partition":1."replicas": [1000006.1000005]."log_dirs": ["/data1/bigdata/mydata1"."/data1/bigdata/mydata3"] {},"topic":"yyj1"."partition":2."replicas": [1000006.1000005]."log_dirs": ["/data1/bigdata/mydata1"."/data1/bigdata/mydata3"]]}}Copy the code
1.2.2 Broker Internal disk data migration
Servers in the production environment generally mount multiple hard disks, such as 4 or 12 hard disks. Within a Kafka cluster, the traffic between brokers is balanced. However, within the broker, the traffic between disks is unbalanced, causing some disks to be overloaded, which affects the performance and stability of the cluster and does not make good use of hardware resources. In this case, we need to load balance the traffic across disks within the broker so that the traffic is more evenly distributed across disks.
1.2.3 Concurrent Data Migration
The copy migration tool bin/ kafka-reassignment-partitions. Sh in the current Kafka open source version (version 2.1.1) can only implement serial migration tasks within a cluster. In the case that multiple resource groups in a cluster are physically isolated, the migration efficiency is a little low because the resource groups do not affect each other, but cannot carry out the parallel commit migration task in a friendly way. This problem was not solved until version 2.6.0. If you want to implement concurrent data migration, you can choose to upgrade Kafka version or modify Kafka source code.
1.2.4 Stopping Data Migration
The copy migration tool bin/ kafka-reassignment-partitions. Sh provided by the current Kafka open Source version (version 2.1.1) cannot stop a migration task after it is started. When a migration task has an impact on the stability or performance of the cluster, there is nothing you can do but wait for the migration task to complete (success or failure), which was not resolved until version 2.6.0. If you want to terminate data migration, you can choose to upgrade Kafka version or modify Kafka source code.
1.3 Traffic Restriction
1.3.1 Production and consumption flow limitation
Sudden and unpredictable abnormal production or consumption traffic may exert great pressure on cluster resources such as I/OS, affecting the stability and performance of the entire cluster. The traffic limiting mechanism is not to limit users, but to prevent sudden traffic from affecting the stability and performance of the cluster, so that users can enjoy better services.
As shown in the following figure, the inbound traffic of a node increases from 140MB/s to 250MB/s, and the outbound traffic increases from 400MB/s to 800MB/s. Without a finite flow mechanism, multiple nodes in the cluster are at risk of being hit by these abnormal flows, or even causing cluster avalanches.
Image production/consumption traffic limit official website address: click the link
The official website provides the following combination of dimensions for limiting producer and consumer traffic (of course, the following limiting mechanism has some drawbacks, which will be discussed later in “Kafka open Source version features defects”) :
/config/users/<user>/clients/<client-id> // Traffic limiting is based on user and client IDS
/config/users/<user>/clients/<default>
/config/users/<user>// Traffic limiting by user is the most commonly used method
/config/users/<default>/clients/<client-id>
/config/users/<default>/clients/<default>
/config/users/<default>
/config/clients/<client-id>
/config/clients/<default>
Copy the code
JMX parameter configuration needs to be enabled when Kafka Broker service is started, so that other applications can collect Kafka JMX indicators for service monitoring. When the user needs to adjust the flow limiting threshold, the user can make an intelligent assessment based on the amount of traffic that a single broker can withstand to determine whether the threshold can be adjusted without human intervention. To limit user traffic, you need to refer to the following indicators:
(1ObjectName: kafka. Server :type=Fetch,user= ACL Authentication user name Attribute:byte-rate (outbound traffic of the user), throttle-time (time for which outbound traffic of the user is restricted) (2) Production traffic indicator: ObjectName: kafka. Server :type=Produce,user= ACL Authentication user name Attribute:byte-rate (inbound traffic of the user to the current broker), throttle-time (time for which inbound traffic of the user to the current broker is restricted)Copy the code
1.3.2 Follower Synchronization leader/ Data migration traffic limit
Copy migration/Data Synchronization Traffic Limiting official website address: link
Parameters involved are as follows:
// The following four parameters are involved in the replica synchronization traffic limiting configuration
leader.replication.throttled.rate
follower.replication.throttled.rate
leader.replication.throttled.replicas
follower.replication.throttled.replicas
Copy the code
Auxiliary indicators are as follows:
(1) a copy of the synchronized flow indicators: ObjectName: kafka. Server: type = BrokerTopicMetrics, name = ReplicationBytesOutPerSec (2Server: Type =BrokerTopicMetrics, Name =ReplicationBytesInPerSecCopy the code
1.4 Monitoring Alarms
There are several open source tools available for Kafka monitoring, such as the following:
Kafka Manager;
Kafka Eagle;
Kafka Monitor;
KafkaOffsetMonitor;
We have embedded Kafka Manager as a tool for us to look at some basic metrics, but these open source tools do not fit well into our own business systems or platforms. Therefore, we need to achieve a set of finer granularity, smarter monitoring, more accurate alarm system. The monitoring coverage should include the underlying hardware, the operating system (the operating system occasionally hangs, causing the broker to suspend service), Kafka broker services, Kafka client applications, ZooKeeper clusters, upstream and downstream full link monitoring.
1.4.1 Hardware Monitoring
Network monitoring:
Core indicators include inbound traffic, outbound traffic, packet loss, network retransmission, number of TCP connections in time. WAIT, switch bandwidth, equipment room bandwidth, and DNS server monitoring (if the DNS server is abnormal, traffic black holes may occur and large service faults may occur).
Disk monitoring:
Core metrics include monitoring disk write, disk Read (if there is no delay or only a small delay when consuming, Generally, there are no disk read operations), disk ioutil, disk IOWAIT (if this indicator is too high, it indicates heavy disk load), disk storage space, disk bad disk, disk bad/bad track (bad tracks or bad blocks cause the broker to be in a semi-dead state and consumers to be stuck due to CRC checks), and so on.
CPU monitoring:
Monitors CPU idle/load, motherboard failures, etc. Generally low CPU usage is not a bottleneck for Kafka.
Memory/swap area monitoring:
The memory usage is abnormal. Procedure In general, all memory on the server is devoted to PageCache except the heap allocated when Kafka’s broker is started.
Cache hit ratio monitoring:
Since disk reads affect Kafka performance, we need to monitor the Linux PageCache cache hit ratio. If the cache hit ratio is high, it indicates that the consumer has hit the cache.
Read the article “Linux Page Cache tuning in Kafka” for details.
System logs:
You need to monitor the error logs of the operating system to detect hardware faults in time.
1.4.2 Broker service monitoring
Broker services are monitored by specifying JMX ports when the broker service is started and then by implementing a metrics collection program to collect JMX metrics. (Official website of server indicators)
Broker-level monitoring: Broker process, broker inbound traffic bytes/records, Broker outbound traffic bytes/records, replica-inbound traffic, replica-synchronized outbound traffic, inter-broker traffic deviation, Broker connections, Broker request queues, Broker network idle rate, Broker production delay, BR Oker consumption delay, number of broker production requests, number of Broker consumption requests, number of distributed leaders on the broker, number of distributed copies on the Broker, traffic on each disk on the Broker, broker GC, etc.
Topic-level monitoring: topic inbound traffic byte size/records, topic outbound traffic byte size/records, no traffic topic, topic traffic mutation (sudden increase/drop), and topic consumption delay.
Partition monitoring: Partition incoming traffic byte size/number of records, partition outgoing traffic byte size/number of records, topic partition copy loss, partition consumption delay record, partition leader switchover, partition data skew (When producing messages, if the key of the message is specified, data skew is easy to be caused. This severely affects Kafka’s service performance), partition storage size (can be managed for topics that are too large for a single partition).
User-level monitoring: bytes of user incoming and outgoing traffic, duration of limiting user incoming and outgoing traffic, and user traffic abrupt change (sudden increase or decrease).
Broker service log monitoring: Monitors error logs generated on the server to detect service exceptions in time.
1.4.3. Client monitoring
Client monitoring is mainly to achieve a set of indicators reporting procedures, this procedure needs to be implemented
Org.apache.kafka.com mon. Metrics. MetricsReporter interface. Then add metric. Reporters to the producer/consumer configuration as follows:
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
/ / ClientMetricsReporter class implements org.apache.kafka.com mon. Metrics. MetricsReporter interfaceprops.put(ProducerConfig.METRIC_REPORTER_CLASSES_CONFIG, ClientMetricsReporter.class.getName()); .Copy the code
Official website address of client indicators:
Kafka.apache.org/21/document…
Kafka.apache.org/21/document…
Kafka.apache.org/21/document…
Kafka.apache.org/21/document…
Kafka.apache.org/21/document…
Kafka.apache.org/21/document…
The client monitoring process architecture is shown in the figure below:
1.4.3.1 Producer Client Monitoring
Dimensions: user name, client ID, client IP, Topic name, cluster name, brokerIP;
Indicators: Number of connections, I/O waiting time, production traffic, number of production records, number of requests, request delay, and number of sending errors or retries.
1.4.3.2 Consumer client monitoring
Dimensions: User name, client ID, client IP, Topic name, cluster name, Consumer Group, brokerIP, Topic partition;
Indicators: connection number, IO wait time, consumption traffic size, consumption record number, consumption delay, topic partition consumption delay record, etc.
1.4.4 Zookeeper monitor
Monitor the Zookeeper process.
Monitor the Zookeeper leader switchover.
Error log monitoring of the Zookeeper service
1.4.5 Link Monitoring
When the data link is very long (for example: business application -> buried point SDk-> data acquisition ->Kafka-> real-time computing -> Business application), we usually need to repeatedly communicate and check with multiple teams to find the link where the problem occurs, so the troubleshooting efficiency is low. In this case, we need to comb through the monitoring of the entire link with upstream and downstream. When a problem occurs, locate the link where the problem occurs in the first time, shortening the time of fault location and fault recovery.
1.5 Resource Isolation
1.5.1 Physical Isolation of Different Service Resources in the Same Cluster
Resource groups of different services in all clusters are physically isolated to prevent services from affecting each other. Here we assume that the cluster has four broker nodes (Broker1 / Broker2 / Broker3 / Broker4), 2 business (business A/B), they respectively have topic partition distribution as shown in the figure below, both business topic scattered in each cluster on the broker, There is also crossover at the disk level.
Imagine if one of our business exceptions, such as a surge in traffic, caused broker nodes to fail or be suspended. At this time, another business will also be affected, which will greatly affect the availability of our service, cause the failure and expand the scope of the failure.
In view of these pain points, we can isolate the physical resources of the services in the cluster, so that each service has exclusive resources, and divide resource groups (here, four brokers are divided into two resource groups, Group1 and Group2), as shown in the following figure. Topics of different services are distributed in their own resource groups. When one of the services is abnormal, Without affecting another business, we can effectively reduce the scope of failure and improve service availability.
1.6 Cluster Classification
Clusters are divided into log cluster, monitoring cluster, billing cluster, search cluster, offline cluster, and online cluster based on service characteristics. Services in different scenarios are placed in different clusters to avoid interaction between different services.
1.7 Capacity Expansion or Reduction
1.7.1 Topic Expand zones
As the volume of topic data grows, the number of partitions specified for the topic we originally created may no longer meet the volume traffic requirements, so we need to expand the partitions for the topic. Consider the following points when expanding a zone:
It is necessary to ensure that the leader and follower polling in a topic zone is distributed among all brokers in the resource group to achieve more balanced traffic distribution. In addition, it is necessary to consider the cross-rack distribution of different copies in the same zone to improve disaster recovery capability.
If the number of topic leaders divided by the number of resource group nodes has a remainder, the remaining leaders should be placed in the broker with low flow first.
1.7.2 broker online
As the volume of business and data increases, our cluster also needs to expand broker nodes. For capacity expansion, we need to achieve the following:
Intelligent capacity expansion evaluation: according to the cluster load, the need for capacity expansion evaluation is programmed and intelligent;
Intelligent capacity expansion: After capacity expansion is evaluated, the capacity expansion process and traffic balancing are platformized.
1.7.3 broker offline
There are certain scenarios where we need to take our broker offline, including the following:
Some aging servers need to be offline to implement node offline platform.
Server failure, broker failure cannot be recovered, we need to offline the faulty server, realize node offline platform;
Existing broker nodes are replaced by better-configured servers to platform offline nodes.
1.8 Load Balancing
Why do we need load balancing? First of all, let’s take a look at the first figure, which shows the traffic distribution of a resource group in our cluster after capacity expansion. The traffic cannot be automatically allocated to our newly expanded nodes. At this time, we need to manually trigger data migration and migrate some copies to new nodes to achieve traffic balancing.
Now, let’s look at the second picture. In this chart, we can see that the flow distribution is very uneven, with the deviation between the lowest and highest flow being several times more than that. This is due to the architectural nature of Kafka. When the cluster size and data volume reach a certain level, problems inevitably arise. In this case, we also need load balancing.
Let’s look at the third picture. Here we can see that only part of the nodes have a sudden increase in traffic. This is because topic partitions are not dispersed within the cluster and are concentrated among several brokers. In this case, we also need to expand partitions and balance.
Our ideal traffic distribution should be as shown in the figure below. The traffic deviation between nodes is very small. In this case, it can not only enhance the cluster’s ability to withstand the abnormal surge of traffic, but also improve the overall resource utilization rate and service stability of the cluster, and reduce costs.
Load balancing we need to achieve the following effects:
1) Generate copy migration plan and perform migration task platformization, automation and intelligentization;
2) After balancing, the flow between brokers is more uniform, and a single topic partition is evenly distributed on all broker nodes;
3) After balancing, the traffic between disks within the broker is balanced.
To achieve this, we need to develop our own load balancing tools, such as the open source Cruise Control secondary development; The core of this tool is the strategy of generating migration plan which directly affects the effect of cluster load balancing. Reference content:
1. linkedIn/cruise-control
2. Introduction to Kafka Cruise Control
3. Cloudera Cruise Control REST API Reference
The Cruise Control architecture diagram is as follows:
When generating a migration plan, we need to consider the following:
1) Select core indicators as the basis for generating migration plan, such as outbound flow, inbound flow, rack, single topic partition dispersion, etc.;
2) Optimize the indicator samples used to generate migration plan, such as filtering abnormal samples such as sudden increase/sudden drop/zero drop of flow;
3) All the samples required by the migration plan of each resource group are internal samples of the resource group and do not involve other resource groups.
4) Manage topics that are too large in a single partition, so that the distribution of topic partitions is more dispersed and the flow is not concentrated in some brokers, so that the amount of data in a single partition is smaller, which can reduce the amount of data migrated and improve the migration speed;
5) Topics that have been evenly dispersed in the resource group should be added to the migration blacklist and not migrated, which can reduce the amount of migrated data and improve the migration speed;
6) Conduct topic governance to eliminate the interference of long-term no flow topic on equilibrium;
7) When creating a topic or expanding a topic partition, all partitions should be polling across all broker nodes. After polling, residual partitions should give priority to brokers with lower flow.
8) When load balancing is enabled after the expansion of broker nodes, the same large volume of traffic is allocated to the same broker first (large volume of traffic rather than large storage space, which can be considered as throughput per second), and some of the leaders of multiple partitions are migrated to new broker nodes.
9) When submitting the migration task, the size deviation of partition data in the same batch of migration plans should be as small as possible, so as to avoid the task tilt caused by waiting for the migration of large partitions for a long time after the migration of small and medium-sized partitions is completed;
1.9 Security Certification
Is our cluster accessible to everyone? Of course not. For the security of the cluster, we need to perform permission authentication to shield illegal operations. Mainly includes the following aspects need to do security certification:
(1) Producer authority authentication;
(2) consumer authority authentication;
(3) Specify data directory migration security authentication;
Kafka.apache.org
1.10 Cluster Dr
Cross-rack Dr:
Kafka.apache.org
Cross-cluster/equipment room Dr: for service scenarios such as remote hypermetro, see MirrorMaker 2.0 in Kafka2.7.
GitHub: github.com
Exact KIP address: cwiki.apache.org
Kafka metadata restoration on The ZooKeeper cluster: We regularly back up the ZooKeeper permission data. When the cluster metadata is abnormal, the data can be restored.
1.11 Parameter/Configuration Optimization
Broker service parameter optimization: I have listed only some of the core parameters that affect performance.
Num.net work.threads # Number of threads that the Processor processes network requests. It is recommended that you set this parameter to the number of CPU cores *2, this value is too low and often the network idle is too low and copies are missing. Num.io. Threads # Create KafkaRequestHandler number of request threads. It is recommended to set the number of broker disks *2Num.replica.fetchers # It is recommended to set the number of CPU cores /4Properly improving CPU utilization and follower synchronization can improve the parallelism of leader data. Lz4 compression is recommended to improve CPU utilization and reduce the amount of data transmitted over the network. Queued.max. requests # In a production environment, minimum configuration is recommended500Above, the default is500. Log. Flush. The scheduler. The interval. The ms flush. The interval. The ms flush. Interval. # messages that several parameters mean log data flushed to disk, should keep the default configuration, Flush strategy let the operating system to complete, by the operating system to decide when to flush data; # If set to this parameter, the effect on throughput may be very large; Auto. Leader. Rebalance. Enable # indicates whether or not the open leader automatic load balancing, the defaulttrue; We should set this parameter tofalseBecause automatic load balancing is not controllable, cluster performance and stability may be affected.Copy the code
Production optimization: HERE I list only a few of the core parameters that affect performance.
Linger. Ms # How long the client waits for a production message to be sent to the server, in milliseconds. This parameter is used in conjunction with batch.size. Proper scaling can improve the throughput, but if the client is down, there is a risk of data loss. Batch. size # Specifies the batch size of messages sent by the client to the server, in conjunction with the linger.ms parameter; Proper scaling can improve the throughput, but if the client is down, there is a risk of data loss. Lz4 compression type is recommended for high compression ratio and throughput. Because Kafka does not have high CPU requirements, it can make full use of CPU resources through compression to improve network throughput. Buffer. memory # The size of the client buffer. If the topic is large and has sufficient memory, you can adjust this parameter to a higher value33554432(32MB) retries # number of retries after a production failure. The default value is0, can be appropriately increased. If the service requires high data accuracy after a certain number of retries, you are advised to perform fault tolerance. The default value is 100ms. You are advised not to set the retry interval too large or too small.Copy the code
In addition to some core parameter optimization, we also need to consider, for example, the number of partitions of topic and the retention time of topic; If the number of partitions is too small and the retention time is too long, but the amount of data written is too large, the following problems may occur:
1) Topic partitions are concentrated on several broker nodes, resulting in unbalanced traffic copies;
2) Some disks inside the broker are overloaded with reads and writes, and the storage is written out.
1.11.1 Consumption optimization
The biggest problem with consumption, and one that often arises, is consumption delay and pulling historical data. When a large amount of historical data is pulled, a large number of disk read operations will occur, polluting the Pagecache, which will increase the disk load, affecting the performance and stability of the cluster.
How can you reduce or avoid large delays?
1) When the amount of topic data is very large, it is suggested that one partition open a thread for consumption;
2) Add monitoring alarms for topic consumption delay, and timely discover and deal with them;
3) When topic data can be discarded, it encounters a huge delay, such as the delay record of a single partition exceeds tens or even hundreds of millions, then the consumption point of topic can be reset for emergency processing; [This scheme is generally used in extreme scenarios]
4) Avoid resetting the topic partition offset to a very early position, which may cause a large amount of historical data to be pulled;
1.11.2 Optimization of Linux Server Parameters
We need to optimize Linux file handles, pagecache, and other parameters. See Linux Page Cache tuning in Kafka.
1.12. Hardware optimization
Disk optimization
If possible, you can replace HDDS with SOLID-state drives (SSDS) to solve the problem of low I/O performance of mechanical drives. If SSDS are not available, you can create a hard RAID (RAID10 is commonly used) for multiple hard drives on the server to balance the I/O load on broker nodes. For HDD mechanical hard drives, a broker can mount multiple hard drives, such as 12 x 4TB.
memory
Kafka is a high frequency read/write service, and Linux read/write requests go mostly through the Page Cache. Therefore, a larger memory on a single node can significantly improve performance. Generally, choose 256GB or higher.
network
Improve network bandwidth: If conditions permit, the greater the network bandwidth, the better. In this way, the network bandwidth will not become a performance bottleneck, and at least 10 gigabit network (10Gb, full duplex network card) can have relatively high throughput. If it is a single channel, the theoretical upper limit of the sum of network outbound traffic and inbound traffic is 1.25GB/s. In the case of duplex and dual-channel, the theoretical value of network incoming and outgoing traffic can reach 1.25GB/s.
Network isolation marking: An equipment room may have offline clusters (such as HBase, Spark, and Hadoop) and real-time clusters (such as Kafka) deployed. In this case, the servers mounted to the same switch will compete for network bandwidth between the real-time cluster and the offline cluster. The offline cluster may affect the real-time cluster. Therefore, we need to implement switch level isolation so that offline machines and real-time clusters do not mount to the same switch. Even if there is one mounted under the same switch, we will mark the network traffic priority (gold, silver, copper, iron) to give priority to real-time traffic when the network bandwidth is tight.
CPU
The bottleneck for Kafka is not the CPU. A single node typically has 32 cores.
1.13 platform
Now the problem comes, we mentioned a lot of monitoring, optimization, etc.; Do we administrators or business users need to log in to the cluster server for all operations in the cluster? Of course the answer is no, we need rich platform features to support it. On the one hand, this is to improve the efficiency of our operation, on the other hand, it is also to improve the stability of the cluster and reduce the possibility of error.
Configuration management
Black screen operation, no change record can be traced every time the broker’s server.properties configuration file is modified, sometimes it is possible for someone to modify the cluster configuration to cause some failure, but the record can not be found. If we put configuration management on the platform, every change will be tracked and the risk of change errors will be reduced.
Scroll to restart
When we need to make online changes, sometimes we need to do a rolling restart of the cluster and multiple nodes. If we go to the command line to operate, the efficiency will be very low, and it requires manual intervention, which wastes manpower. At this time, we need to platform this repetitive work and improve our operational efficiency.
Cluster management
Cluster management is mainly the original in the command line of a series of operations on the platform, users and administrators no longer need black screen operation Kafka cluster; This has the following advantages:
Improve operational efficiency;
The probability of operation error is smaller, and the cluster is more secure.
All operations are traceable and traceable;
Cluster management mainly includes broker management, topic management, production/consumption permission management, user management, etc
1.13.1 mock function
The platform provides the function of producing sample data and consumption sampling for users’ topics. Users can test whether the topic can be used and whether the permissions are normal without writing their own codes.
Provide production/consumption permission verification function for users’ topic on the platform, so that users can determine whether their account has read and write permission for a topic.
1.13.2 Rights Management
Platform related operations such as user read/write permission management.
1.13.3 Capacity Expansion or Reduction
Broker nodes are offline to the platform so that all online and offline nodes no longer need to operate the command line.
1.13.4 Cluster Governance
1) Governance of no flow topic: clean up no flow topic in the cluster to reduce the pressure caused by excessive useless metadata on the cluster;
2) Manage the data size of topic partitions, sort out the topics with excessive data volume (for example, the data volume of a single partition exceeds 100GB/ day), and see whether it needs to be expanded to avoid data concentration on some nodes of the cluster;
3) Topic partition data skew governance to avoid client specifying key of message when producing message, but key is too centralized and messages are only distributed in some partitions, resulting in data skew;
4) Decentralized management of topic partitioning, which enables topic partitioning to be distributed on as many brokers as possible in the cluster, so as to avoid the risk that the flow will only be concentrated on a few nodes due to the sudden increase of topic traffic, and also avoid the great influence of a certain broker abnormality on topic;
5) Consumption delay management in topic zones; Generally, there are two situations when delayed consumption is high. One is the performance of the cluster deteriorates, and the other is that the consumption concurrency of the business side is not enough. If the consumption concurrency is not enough, the business side should contact to increase the consumption concurrency.
1.13.5 Monitoring Alarms
1) Make all indicator collection platforms configurable, provide a unified indicator collection, indicator display and alarm platform, and realize integrated monitoring;
2) Associate upstream and downstream businesses to make full link monitoring;
3) Users can configure topic or zone traffic delay, mutation and other monitoring alarms;
1.13.6 Large Service Screen
Main indicators of service large screen: Cluster number, node number, day into the flow, the flow rate, flow size, sunrise sunrise flow records per second, per second into the flow, the flow record, out of the flow, the flow per second per second record, the user number, production time, spending time data storage, data reliability and service availability, size, number, number of topic, the resource group partition Number of copies, and number of consumer groups.
1.13.7 Traffic Restriction
The user traffic is transferred to the platform and the intelligent traffic limiting process is carried out on the platform.
1.13.8 Load Balancing
The automatic load balancing function is implemented on the platform for scheduling and management.
1.13.9 Resource budget
When the cluster reaches a certain size and the traffic grows, where does the cluster expansion machine come from? Business resource budget, so that multiple businesses in the cluster according to their own in the cluster when the flow to share the hardware cost of the whole cluster; Of course, independent clusters and isolated resource groups, the budget can be calculated separately.
1.14. Performance evaluation
1.14.1 Single Broker Performance Evaluation
Our objectives for single broker performance evaluation include the following:
1) Provide a basis for our resource application evaluation;
2) Let us have a better understanding of the cluster’s read and write ability and where the bottleneck is, and optimize for the bottleneck;
3) It provides a basis for setting the current limiting threshold;
4) Provide a basis for us to evaluate when expansion should be carried out;
1.14.2 Topic Partitioning performance evaluation
1) Provide a reasonable basis for evaluating how many partitions should be specified when we create a topic;
2) Provide basis for our topic zoning expansion evaluation;
1.14.3 Performance Evaluation of a Single disk
1) To understand the true read/write capability of the disk, and to provide a basis for us to choose a more appropriate Kafka disk type;
2) Provide the basis for setting the disk traffic alarm threshold;
1.14.4 Cluster size limitation
1) We need to understand the upper limit of the size of a single cluster or metadata, and explore the impact of relevant information on cluster performance and stability;
2) Assess the reasonable range of cluster node scale according to the bottom situation, predict risks in time, and split the super-large cluster;
1.15 DNS+LVS Network Architecture
What if we specify the bootstrap.servers configuration when our cluster nodes reach a certain size, such as hundreds of broker nodes in a single cluster? Should you choose just a few brokers or all of them?
If we configure only a few IP addresses, when we configure several broker nodes to go offline, our application will not be able to connect to the Kafka cluster. It’s unrealistic to configure all the IP’s, hundreds of IP’s, so what do we do?
Solution: using DNS+LVS network architecture, the final producer and consumer clients only need to configure the domain name. Note that when a new node is added to the cluster, you need to add a mapping. When a node goes offline, it needs to be removed from the map. Otherwise, if these machines are used elsewhere, some of the original cluster requests will be sent to the offline server if the port is the same as Kafka’s, causing a critical failure in the production environment.
Second, the open source version features defects
The main features of RTMP are multiplexing, subcontracting and application layer protocols. These features are described in detail below.
2.1 Copy Migration
Unable to implement incremental migration; [We have implemented incremental migration based on 2.1.1 version of the source code transformation]
Unable to implement concurrent migration; Concurrent migration was not implemented in the open source version until 2.6.0.
Aborted migration cannot be implemented; The open source version did not implement the pause migration until 2.6.0, which is different from the stop migration. Metadata will not be rolled back.
When the migration data directory is specified, if the retention time of topic is shortened during the migration process, the retention time of topic does not take effect for the migrating topic partition, and the expired data of topic partition cannot be deleted. [Bug in open source version, not fixed yet]
If you specify a data migration directory and the migration plan is as follows, the entire migration task cannot be completed and remains stuck. [Bug in open source version, not fixed yet]
During migration, if a broker node is restarted, all leader partitions on that broker node cannot be switched back, causing all node traffic to be transferred to other nodes. The leader will not switch back until all copies are migrated. The open source version is buggy and has not been fixed yet.
In the native Kafka version, the following data directory scenarios cannot be migrated, and we have not decided to fix this bug:1.For the same topic partition, if part of the target copy is the owning broker of the original copy, and part of the target copy is the internal owning data directory of the broker compared with the original copy. The target copy of the changed broker can be migrated normally. The target copy of the changed data directory inside the broker cannot be migrated normally. However, the old copy can still normally provide production and consumption services, and does not affect the submission of the next migration task. The submission of the next migration task only needs to change the broker list of the copy list of this topic partition, and the migration can still be completed normally, and the unfinished target copy can be cleaned up. The distribution of initialization copies of Topic YYj1 is assumed as follows: {"version":1."partitions":[
{"topic":"yyj"."partition":0."replicas": [1000003.1000001]."log_dirs": ["/kfk211data/data31"."/kfk211data/data13"]]}}// Migration scenario 1:
{
"version":1."partitions":[
{"topic":"yyj"."partition":0."replicas": [1000003.1000002]."log_dirs": ["/kfk211data/data32"."/kfk211data/data23"]]}}// Migration scenario 2:
{
"version":1."partitions":[
{"topic":"yyj"."partition":0."replicas": [1000002.1000001]."log_dirs": ["/kfk211data/data22"."/kfk211data/data13"[}]} According to the distribution distribution of topic YYj1 above, if our migration plan is "migration scenario"1"Or migration scenarios2", then there will be instances where copies cannot be migrated. However, this does not affect the old copy processing production, consumption requests, and we can submit other migration tasks normally. To clean up the old unmigrated replicas, we only need to change the migration plan once (the new target replicas list is completely different from the current partition allocated replicas list) and commit the migration again. Here, we use the above example to modify the migration plan as follows: {"version":1."partitions":[
{"topic":"yyj"."partition":0."replicas": [1000004.1000005]."log_dirs": ["/kfk211data/data42"."/kfk211data/data53"]}]} so that we can complete the migration normally.Copy the code
2.2 Traffic Protocol
The granularity of current limiting is coarse, which is not flexible, accurate and intelligent enough.
Current traffic limiting dimension combination
/config/users/<user>/clients/<client-id>
/config/users/<user>/clients/<default>
/config/users/<user>
/config/users/<default>/clients/<client-id>
/config/users/<default>/clients/<default>
/config/users/<default>
/config/clients/<client-id>
/config/clients/<default>
Copy the code
There is a problem
If multiple users are producing and consuming a large amount of traffic on the same broker at the same time, the sum of all user traffic thresholds must not exceed the upper limit of the broker. If the limit is exceeded, the broker risks being suspended. However, even if the user traffic does not reach the broker’s upper limit, all production and consumption requests will be blocked and the Broker may be in a state of suspended animation if all user traffic is concentrated on some disks and exceeds the disk’s read and write load.
The solution
(1) Modify the source code to implement a single broker traffic limit, as long as the flow reaches the limit of the broker immediately limit processing, all users writing to the broker can be restricted; Or the user priority processing, let go of the high priority, restrict the low priority;
(2) the modified source code, the realization of single disk on the broker flow limit (most of the time is flow into some blocks on a disk, in not broker flow limit is exceeded the upper limit of single disk, speaking, reading and writing), as long as the disk as ceiling, current-limiting processing immediately, all written to the disk user can be limited; Or the user priority processing, let go of the high priority, restrict the low priority;
(3) Transform the source code to achieve topic dimension flow limiting and topic partition write banning function;
(4) Transform the source code to achieve the combination of user, broker, disk, topic and other dimensions of precise flow limiting;
Development trend of Kafka
3.1 Kafka Community Iteration Plan
3.2 Phasing out ZooKeeper (KIP-500)
3.3 Separation of Controller and Broker and introduction of RAFT protocol as controller mediation mechanism (KIP-630)
3.4 Layered Storage (KIP-405)
3.5 Can reduce topic partitions (KIP-694)
3.6 MirrorMaker2 Precision once (KIP-656)
3.7 Downloads and Version Features
3.8 Kafka all KIP addresses
How to contribute to the community
4.1 What points can be contributed
Kafka.apache.org/contributin…
4.2 Wiki Contribution Address
Cwiki.apache.org/confluence/…
4.3 issues address
1) issues.apache.org/jira/projec…
2) issues.apache.org/jira/secure…
4.4 the main committers
kafka.apache.org/committers
Yang Yijun, Vivo Internet Server Team