Hornet’s nest technology original articles, more dry goods please subscribe to the public account: mFWtech

Kafka is a popular message queue middleware. It can process massive data in real time, has high throughput, low latency and other characteristics and reliable message asynchronous transmission mechanism. It can well solve the problem of data communication and transmission between different systems.

Kafka is also widely used in hornets’ nest, supporting many core businesses. This paper will focus on the application practice of Kafka in hornet’s nest big data platform, introduce the relevant business scenarios, what problems we encountered in different stages of Kafka application and how to solve them, and what plans we have after.

Part.1 Application Scenarios

The application scenarios of Kafka in big data platforms can be divided into the following three categories:

The first is to use Kafka as a database to provide real-time data storage services for big data platforms. In terms of source and use, real-time data can be divided into business DB data, monitoring type logs, buried point-based client logs (H5, WEB, APP, small program) and server logs.

The second type is to provide data sources for data analysis. Each buried point log will serve as the data source to support and connect with offline data, real-time data warehouse and analysis system, including multidimensional query, real-time Druid OLAP, log details, etc.

The third type is to provide data subscription for the business side. In addition to the internal application of the big data platform, we also use Kafka to provide data subscription services for core businesses such as recommendation search, large transportation, hotels and content centers, such as real-time user feature calculation, real-time user portrait training and real-time recommendation, anti-cheating, business monitoring and alarm.

Main applications are shown in the figure below:

Part.2 Evolution

Four stages

Early big data platforms introduced Kafka as a collection and processing system for business logs mainly because of its characteristics of high throughput and low latency, multiple subscriptions, and data backtracking, which can better meet the requirements of big data scenarios. But with the rapid increase of business and the business use and the problems in the system maintenance, such as registration mechanism, monitoring mechanism is not perfect, lead to the problems that cannot be quick positioning, and some of the online real-time tasks not quick recovery after failure cause, such as message backlog that Kafka stability and availability of the cluster may be challenged, Experienced several serious breakdowns.

It is urgent and difficult for us to solve the above problems. In view of the pain points of using Kafka for big data platforms, we carried out a series of practices from cluster usage to application layer extension, which generally includes four stages:

Phase 1: Version upgrade. Based on some bottlenecks and problems in the production and consumption of platform data, we selected the current version of Kafka and finally decided to use version 1.1.1.

Stage 2: Resource isolation. In order to support the rapid development of business, we have improved the construction of multiple clusters and resource isolation among topics in clusters.

** Stage 3: ** Permission control and alarm monitoring.

First, in terms of security, early Kafka clusters were running naked. Because Kafka is shared by multiple product lines, it is easy to misread topics from other businesses and cause data security issues. Therefore, we added the authentication function based on SASL/ SCRAM + ACL.

In terms of monitoring alarms, Kafka is now standard for input data sources in real-time computing, where Lag backlogs and throughput are important indicators of the health of real-time tasks. Therefore, the big data platform built a unified Kafka monitoring alarm platform and named “radar”, multi-dimensional monitoring Kafka cluster and user situation.

Stage four: Application expansion. In the early days when Kafka was open to the company’s lines of business, the lack of a uniform usage specification led to incorrect use by some business parties. In order to solve this pain point, we built a real-time subscription platform to empower the business side in the form of application services, realizing process automation of data production and consumption application, user authorization of the platform, user monitoring and alarm monitoring and other links, and creating an overall closed-loop from the use of the demand side to the full control of resources.

Here are a few key points.

Core practice

1. Upgrade the version

Kafka is a big data platform that has been using 0.8.3 as an early version of Kafka. As of now, the latest official Release of Kafka has been 2.3, so there are many bottlenecks and problems that have been encountered in the process of using 0.8 for a long time.

For example, here are some common problems with previous versions:

  • Lack of support for Security: Data Security issues and failure to use fine-grained resource management through authentication and authorization
  • Broker under replicated: The broker is found to be in the under replicated state, but the cause of the problem is uncertain and difficult to resolve.
  • New features cannot be used: transaction messages, idempotent messages, message timestamps, message queries, etc.
  • The client relies on ZooKeeper to manage its offset. Zookeeper is excessively used, which increases o&M complexity
  • Poor monitoring metrics, such as topic, partition, and broker data size metrics, and monitoring tools such as Kafka Manager do not support early versions of Kafka

At the same time, some characteristics of the target version selection research, such as:

  • In version 0.9, quotas and security were added, with security authentication and authorization being the most focused features
  • Version 0.10, more fine-grained timestamp. You can do a quick data lookup based on the offset to find the desired timestamp. This is extremely important in real-time data processing for data replay based on Kafka data sources
  • Version 0.11, idempotent and Transactions support and replica data loss/data inconsistency resolution.
  • Version 1.1, operational improvements. For example, when a Controller Shut Down wants to Shut Down a Broker, a long and complex process has been greatly improved in version 1.0.

The 1.1 version was chosen because of Camus compatibility with Kafka and because 1.1 already supports important new features in usage scenarios. Here’s a quick word about the Camus component, which is also open sourced by Linkedin and is used in our big data platform as an important way to Dump Kafka data to HDFS.

2. Resource isolation

Previously, due to the complexity and small scale of the business, big data platform for Kafka cluster division is relatively simple. As a result, over time, the business data of the company will be mixed together, the improper use of one business topic may cause some brokers to become overloaded, affecting other normal services, or even the failure of some brokers may affect the whole cluster, causing the whole company to become unusable.

In view of the above problems, the cluster transformation has done two practices:

  • Split independent clusters by function attributes
  • Resource isolation of Topic granularity within the cluster

(1) Cluster splitting

Split multiple Kafka physical clusters based on function dimensions to isolate services and reduce o&M complexity.

In terms of the most important use of buried point data, it is currently divided into three types of clusters, and the functions of each cluster are defined as follows:

  • Log cluster: Buried data of each end will be first dropped to this cluster. Therefore, this process cannot be interrupted due to Kafka problems, which have high requirements on Kafka availability. Therefore, the cluster does not provide external subscriptions to ensure controllable consumers; At the same time, the cluster service also serves as the source of offline collection. Data is dumped to HDFS by the Camus component in hourly granularity, and this part of data participates in subsequent offline calculation.

  • Full subscription cluster: Most of the data in this cluster Topic is synchronized from the Log cluster in real time. As mentioned above, the Log cluster’s data is not external, so the full cluster takes over the responsibility of consuming subscriptions. At present, it is mainly used for real-time tasks within the platform to analyze data of multiple lines of business and provide analysis services.

  • Personalized custom clustering: As mentioned earlier, we can split and merge data log sources based on business needs, and we also support custom Topic clustering, which only needs to provide the landing storage of the distributed topics.

The overall structure of the cluster is divided as follows:

(2) Resource isolation

The traffic volume of a Topic is an important basis for resource isolation within a cluster. For example, if the two data sources with a large amount of buried logs in our business are the back-end buried data source server-Event and the back-end buried mobile event data source, we want to avoid the topic partition that stores both data being allocated to nodes on the same Broker in the cluster. By physically isolating the different topics, you can avoid traffic skew on the Broker.

3. Control rights and monitor alarms

(1) Permission control

As we said at the beginning of the introduction, the early Kafka cluster did not set up security authentication in the naked state, so as long as the connection address of the Broker can be produced and consumed, there are serious data security issues.

In general, SASL users tend to prefer Kerberos, but in the case of platform Kafka clusters, the user system is not complex enough to overwork Kerberos, and Kerberos is complex enough to cause other problems. In addition, as for Encryption, SSL Encryption is not used because it is run in the internal network environment.

The final platform Kafka cluster uses SASL as the authentication mode, based on SASL/ SCRAM + ACL lightweight combination to achieve dynamic user creation and ensure data security.

(2) Monitor alarms

In our previous clustering experience, we often found that the performance of consumer applications deteriorated for no reason. As a result, the kernel of the Broker machine must load data from disk to page-cache before returning the results to the Consumer. This means that disks that could have been used for write operations are now being read, affecting user reads and writes and reducing cluster performance.

At this time, it is necessary to identify lagging Consumer applications for intervention in advance to reduce the occurrence of problems, so monitoring alarms is of great significance to both the platform and users. Introduce our practice idea below.

Overall plan:

The solution is based on the open source Kafka JMX Metrics+OpenFalcon+Grafana:

  • Kafka JMX Metrics: Kafka Broker’s internal Metrics are exposed externally in the form of JMX Metrics. Version 1.1.1 provides rich monitoring indicators to meet monitoring requirements
  • OpenFalcon: An enterprise-level, highly available and scalable open source monitoring system developed by Xiaomi
  • Grafana: A familiar Metrics visualization system that connects to multiple Metrics data sources.

About monitoring:

  • Falcon-agent: deployed on each Broker to parse the Kafka JMX metrics report data
  • Grafana: Used to visualize Falcon Kafka Metrics data and monitor large scales for Cluster, Broker, Topic, and Consumer.
  • Eagle: Obtains information about the Active status and Lag of consumer groups, and provides apis to provide monitoring data for the alarm monitoring system radar.

About alarms:

Radar system: a self-developed monitoring system, which obtains Kafka indicators through Falcon and Eagle and generates alarms by setting thresholds. Take consumption patterns as an example. Lag is an important indicator of normal consumption. If Lag keeps increasing, it must be dealt with.

Not only does the Consumer administrator need to know when a problem occurs, but its users also need to be notified by the alarm system. In this way, the enterprise wechat alarm robot automatically alerts the responsible person or user of the corresponding consumer group and the manager of the Kafka cluster.

Monitoring examples:

4. Application extension

**(1) Real-time data subscription platform **

The real-time data subscription platform is a system application that provides Kafka with whole-process management. It automates processes such as data production and consumption application, platform user authorization, and user monitoring and alarm monitoring by way of work order approval, and provides unified control.

The core idea is based on Kafka data source identity authentication and authority control, increase data security while Kafka downstream application management.

(2) Standardized application process

Regardless of the needs of producers or consumers, users will first apply for subscriptions in the form of work orders. Application information includes business line, Topic, subscription mode and other information; The work order will finally be transferred to the platform for approval; If approved, the user will be assigned an authorized account and Broker address. So far, use square can undertake normal production consumption.

(3) Monitor alarms

For a platform, permissions are tied to resources, which can be a production Topic or a GroupTopic for consumption. Once the permission is assigned, the use of this part of resources will be automatically registered in our radar monitoring system for monitoring the whole life cycle of resources.

(4) Data replay

For data integrity and accuracy, Lamda architecture has become a common architecture approach for big data. On the other hand, the Lamda architecture also suffers from resource overuse and high development difficulty.

The real-time subscription platform can provide the consumer group with any site reset, support the real-time data by time, site and other ways of data replay, and provide support for Kappa architecture scenarios to solve the above pain points.

(5) Theme management

Why provide topic management? For a simple example, when we want a user to create his own Kafka Topic on a cluster, we don’t want him to go directly to a node. Therefore, the service mentioned above, whether for users or administrators, we need to have an interface to operate it, because it is impossible for everyone to connect to the server through SSH.

Therefore, you need a service that provides management capabilities, creates a unified entry point and introduces services for topic management, including topic creation, resource isolation designation, topic metadata management, and so on.

(6) Data diversion

In previous architectures, consumers consume Kafka data at a granularity of the entire LogSource for each Kafka Topic, but many consumers consume only a portion of each LogSource’s data, which may be data for several buried events under an application. If the downstream application needs to write its own filtering rules, there must be a waste of resources and easy to use problems; There are also scenarios where multiple data source merges are used together.

Based on the above two cases, we have implemented the splitting, merging and customization of Topic according to the needs of the business side to support data merging across data sources and filtering rules of any group of conditions of AppCode and Event Code.

Part.3 Follow-up plan

  • Solve data duplication problems. In order to solve the problem of data duplication caused by failure recovery and other factors in real-time stream processing, we are trying to use Kafka transaction mechanism combined with Flink two-segment commit protocol to achieve end-to-end only once semantics. It has been tested in a small scale on the platform. If it passes the test, it will be promoted in the production environment.

  • Consumer current limit. In a write-to-read scenario, if a Consumer reads a large number of disks, the latency of other Consumer operations at the Produce level will be affected. Therefore, Kafka Quota mechanisms that limit Consume and dynamically adjust thresholds are our next steps

  • Scenario extension. Based on Kafka extension SDK, HTTP and other message subscription and production methods, to meet the needs of different language environments and scenarios.

The above is about Kafka in the hornet’s nest big data platform application practice sharing, if you have any suggestions or questions, welcome to the hornet’s nest technology public number background message.

Author: Bi Bo, r&d engineer of Hornet’s Nest Big data platform.