I. Background introduction

(I) General framework of streaming platform

At present, the general architecture of flow platform generally includes message queue, computing engine and storage. The general architecture is shown in the figure below. Logs on the client or web are collected in message queues. The computing engine calculates the message queue data in real time. Real-time calculation results are stored in the real-time storage system in the form of Append or Update.

At present, the message queue we commonly use is Kafka, and Spark Streaming is used as the computing engine at the beginning. As the advantages of Flink in Streaming computing engine become more and more obvious, we finally decide Flink as our unified real-time computing engine.

(2) Why Kafka?

Kafka is an early message queue, but it is a very stable message queue with a large number of users, including netease. The main reasons we considered Kafka as our messaging middleware are as follows:

· High throughput and low latency: hundreds of thousands of QPS per second and millisecond delay;

· High concurrency: thousands of clients can read and write simultaneously;

· Fault tolerance and high reliability: support data backup and allow node loss;

· Scalability: It supports hot expansion and does not affect current online services.

(3) Why Flink?

Apache Flink is an open source big data streaming computing engine that is becoming more and more popular in recent years. It supports both batch and streaming processing. The main factors for considering Flink as our streaming computing engine are as follows:

· High throughput, low latency, high performance;

· Highly flexible streaming window;

· Exactly-once semantics of state calculation;

· Lightweight fault tolerance mechanism;

· Support EventTime and out-of-order events;

· Unified streaming batch engine.

Kafka + Flink flow calculation system

The outstanding performance of Kafka and Flink in message-oriented middleware and streaming computing has resulted in a flow computing platform system based on Kafka and Flink, as shown in the figure below: The logs generated in real time are collected into Kafka based on APP and Web, and then handed over to Flink for real-time computation such as ETL, global aggregation and Window aggregation.

(V) Current situation of Kafka used by netease Cloud Music

We currently have over 10 Kafka clusters, each with a different primary mission, some as a business cluster, some as a mirror cluster, some as a computing cluster, etc. The total number of nodes in Kafka cluster reaches 200+, and the peak QPS of single Kafka reaches 400W+. At present, netease Cloud Music has 500+ real-time tasks based on Kafka+Flink.

Flink+Kafka platform design

Based on the above situation, we want to do a platform-based development of Kafka+Flink to reduce user development costs and operation and maintenance costs. In fact, we started working on a real-time computing platform based on Flink in 2018, and Kafka plays an important role. This year, we refactored Flink and Kafka to make it easier and easier for users to use them.

Based on Flink 1.0, we did a Magina version reconstruction. At the API level, we provided Magina SQL and Magina SDK throughout DataStream and SQL operations. The custom Magina SQL Parser will convert the SQL into LogicalPlan and convert LogicalPlan into physical execution code. In this process, the catalog will be connected to the metadata management center to obtain some metadata information. In the process of using Kafka, Kafka metadata information is registered in the metadata center, and real-time data access is in the form of flow tables. The use of Kafka in Magina has three main parts:

· Cluster catalog;

· Topic flow tablarization;

· Message Schematization.

Users can register different table information or catalog information in the metadata management center, and create and maintain Kafka tables in the DB. Users only need to use the corresponding tables according to their personal needs. Here is the main reference logic for the Kafka flow table.

The application of Kafka in real-time data warehouse

(1) Development through solving problems

In the process of using Kafka real-time warehouse, we encountered different problems and tried different solutions.

In the early stage of the platform, there were only two clusters for real-time computing and one collection cluster, with a large amount of data per Topic. Different real-time tasks consume the same Topic with a large amount of data.

As a result, Kafka was found to be extremely stressful, with frequent delays and SPIKES in I/O.

We thought of real-time distribution of large topics to solve the above problems. Based on Flink 1.5, we designed the data distribution program as shown in the figure below, which is the prototype of real-time data warehouse. Based on this will be a big Topic distribution into small Topic method, greatly reduce the pressure of the cluster, and improve the performance, in addition, the original is using a static distribution rules, when it’s late, you need to add rules to restart task for business impact is bigger, then we consider using dynamic rules to complete the task of data distribution.

After addressing the platform’s initial problems, Kafka faces new ones as the platform progresses:

· Despite the expansion of the cluster, the workload is also increasing, and the Kafka cluster pressure is still increasing;

· I/ O-related problems sometimes occur when the cluster pressure rises, and consumption tasks tend to interact with each other;

· When users consume different topics, there is no intermediate data landing, which is easy to cause repeated consumption;

· Task migration Kafka is difficult.

To address these problems, we implemented the Following Kafka cluster isolation and data layering. In simple terms, the cluster is divided into DS cluster, log collection cluster and distribution cluster. The data is distributed to Flink for processing by distribution service, and then to DW cluster by data cleaning. Meanwhile, the data is synchronized to the mirror cluster during DW writing. In this process, Flink will also be used for real-time calculation statistics and splicing, and the generated ADS data will be written into online ADS cluster and statistical ADS cluster. The preceding procedure ensures that tasks with high requirements for real-time computing are not affected by statistical reports.

The preceding procedure ensures that tasks with high requirements for real-time computing are not affected by statistical reports. But as we distribute different clusters, we inevitably face new problems:

· How to sense Kafka cluster status?

· How to quickly analyze abnormal Job consumption?

In view of the above two problems, we made a Kafka monitoring system, its monitoring is divided into the following two dimensions, so that in the abnormal time can be specific to determine the details of the problem:

· Monitoring of cluster overview: we can see the number of topics corresponding to different clusters and the number of running tasks, as well as the amount of task data consumed by each Topic, data inflow, total inflow and average data size of each piece;

· Indicator monitoring: You can see Flink task and corresponding Topic, GroupID, cluster, startup time, input bandwidth, InTPS, OutTPS, consumption delay and Lag.

(2) The application of Flink + Kafka in Lambda architecture

Stream batch unification is a hot concept right now, and many companies are also considering its application. Currently, the commonly used architectures are either Lambda or Kappa. In terms of stream batch unification, we need to consider the unification of storage and computing engine. Since there is no unified storage in our current infrastructure, we can only choose Lamda architecture.

The following figure shows the concrete practice of Lambda architecture based on Flink and Kafka in cloud music. The upper layer is real-time computing, the lower layer is offline computing, the horizontal layer is divided by computing engine, and the vertical layer is divided by real-time data warehouse.

Iv. Problems & improvements

In the specific application process, we also encountered a lot of problems, the two main problems are:

· Repeated consumption of Kafka Source with multiple sinks;

· Same switch traffic surge consumption calculation delay.

(1) Repeated consumption of Kafka Source with multiple sinks

Magina platform supports multiple sinks, which means that any intermediate result can be inserted into different storage during operation. For example, if we insert different parts of the same intermediate result into different stores, there will be multiple DAGs. Although these are temporary results, they will also cause repeated consumption of Kafka Source, resulting in a huge waste of performance and resources.

So we wondered if we could avoid multiple consumption of temporary intermediate results. Prior to version 1.9, we reconstructed the StreamGraph, merging the daGs of the three DataSource; In version 1.9, Magina itself provides a query and Source merge optimization; However, we found that if multiple Source references to the same table are in the same data update, they will merge themselves, but if they are not in the same data update, they will not merge immediately. After 1.9 we made a buffer to modifyOperations to solve this problem.

(2) The same switch traffic surge consumption calculation delay

This problem is a recent one, and it may not only be the same switch, but also the same machine room. We deployed many machines on the same switch, some with Kafka clusters and some with Hadoop clusters. On Top of Hadoop we might do Spark, Hive for offline computing, and Flink for real-time computing, which also consumes Kafka for real-time computing. We found that in the process of running a task will appear the total delay, found no other abnormalities after screening, in addition to the switch at a certain point in time browsing, further explored found is calculated offline browsing, and for the same bandwidth limit switches, affected the Flink real-time computing.

In order to solve this problem, we considered avoiding the interaction between offline and real-time clusters by optimizing switch deployment or machine deployment, such as using a separate switch for offline clusters and a separate switch for Kafka and Flink clusters to ensure that the two do not interact with each other at the hardware level.

Q & A

Q1: Is Kafka data reliable in real-time data warehouse?

A1: The answer to this question depends more on the definition of data accuracy, and different criteria may yield different answers. The first thing you need to do is define under what circumstances the data is reliable, and have a good fault tolerance mechanism in the process.

Q2: How do we learn about these problems in the enterprise? How to accumulate these problems?

A2: I personally believe that the process of learning is problem-driven. When encountering problems, I try to think and solve them, and accumulate experience and shortcomings in the process of solving them.

Q3: How do you deal with abnormal data in Kafka? Is there a detection mechanism? A3: In the process of operation, we have a distributed service. In the process of distribution, we will detect which data is abnormal and which is normal according to certain rules, and then distribute the abnormal data to an abnormal Topic for query. Later users can check these data in abnormal topics according to relevant indicators and keywords in the process of use.

The last

Thank you for reading here, the article is inadequate, welcome to point out; If you think it’s good, give me a thumbs up.