This article is shared by Sun Xiaoguang, head of Zhihu technology platform, and mainly introduces the construction practice of Zhihu Flink data integration platform. As follows:
- The business scenario
- Design history
- Full shift to post-Flink design
- Future Flink application scenario planning
1. Business scenarios
I am very glad to share with you some gains in the process of reconstructing the last generation of data integration platform based on Flink. As a link to connect heterogeneous data, data integration platforms need to connect various storage systems. Different technology stacks and different business scenarios will put forward different design requirements for data integration system.
Let’s first look at the business scenarios of zhihu’s internal data integration. Similar to many Internet companies, the online storage system of Zhihu used to be mainly MySQL and Redis, and HBase was also used for some services with large data volume. As technology has evolved in recent years, we have started to migrate from MySQL to TiDB. Similarly, we started to evolve HBase to Zetta based on TiKV technology stack. The vast majority of offline storage scenarios are supported by Hive tables.
From online storage to offline storage, there are very strong data synchronization requirements. There is also a lot of streaming data, such as data in messaging systems, that we want to be able to connect to a variety of online and offline storage systems. In the past, Zhihu mainly used Kafka to support streaming data. Recently, Pulsar has also been introduced. There are strong requirements for data exchange between the two messaging systems and storage systems.
In the business scenario and current development state of Zhihu, there are some challenges in technology and process management for data integration.
-
First of all, from the technical point of view, the diversity of data sources will put forward higher requirements for the connection expansion ability of data integration system. In addition, the next-generation storage system not only brings more capabilities to the service, but also releases the pressure on the service, thus accelerating the expansion of data volume. The rapid growth of data magnitude has put forward higher requirements for the throughput and real-time performance of data integration platform. Of course, as a basic system related to data, data accuracy is the most basic requirement, which we must do well.
-
In addition, from the perspective of process management, we need to understand and integrate the data scattered in different business teams, manage well and ensure the security of data access, so the whole process of data integration is relatively complex. While platformization can automate complex processes, the high costs inherent in data integration efforts cannot be completely eliminated by platformization. Therefore, improving the reusability and manageability of the process as much as possible is also a continuous challenge for data integration systems.
Based on the challenges in these two directions, we plan the design objectives of the data integration platform.
-
From the perspective of technology, we need to support various storage systems that Zhihu has put into use and will promote in the future, and have the ability to integrate diverse data in these systems. In addition, we also need to ensure the reliability and accuracy of data integration on the premise of high throughput and low scheduling delay.
-
In terms of flow, metadata and scheduling systems of various internal storage systems can be integrated to reuse existing system infrastructure capabilities, simplifying data access processes and reducing user access costs. We also hope to provide users with the means to meet their data needs in a platform-based way, so as to improve the overall efficiency of data integration.
-
From the point of view of improving task manageability, we also need to maintain the data kinship. Enable businesses to better measure the relationship between data outputs, evaluate the business value of data outputs more effectively, and avoid low-quality and repetitive data integration work. Finally, we need to provide systematic monitoring and alarm capabilities for all tasks to ensure the stability of data output.
Ii. Historical design
Before the formation of the first-generation data integration platform of Zhihu, a large number of tasks were scattered in crontab maintained by each business party or various scheduling systems built by themselves. In such a state without management, the reliability and data quality of various integration tasks are difficult to be effectively guaranteed. Therefore, at this stage, what we need to solve most urgently is the problem of management, so that the process of data integration can be managed and monitored.
As a result, we have consolidated the metadata systems of various storage systems so that everyone can see all of the company’s data assets in one unified place. Then the synchronization tasks of these data are centrally managed in the dispatching center, which is responsible for task dependency management. In addition, the scheduling center monitors key indicators of tasks and provides alarm capabilities. In this stage, we use Sqoop, which is widely used in the past, to achieve data synchronization between MySQL and Hive. In the late stage of platform construction, Flink was introduced to synchronize Kafka data to HDFS with the emergence of streaming data synchronization requirements.
When building the first-generation integration platform, we made a choice between continuing with the widely proven Sqoop or migrating to an alternative technology solution. Compared to Sqoop, DataX is a very competitive competitor in this field. If we make a horizontal comparison between these two products, we can find that they have advantages that each other does not have in different aspects.
-
Sqoop, for example, has MapReduce-level scalability and native Hive support on a system scale. However, Sqoop has the disadvantage of not having rich data source support and lacking some important features.
-
DataX provides very rich data source support, built in speed limiting capabilities that are very important for data integration systems, and its good design makes it easy to customize and extend. However, it also suffers from the lack of cluster resource management support and lack of native Hive Catalog support.
At that time, neither of the two products had an absolute advantage over each other. So we chose to continue to use Sqoop, and maintaining Sqoop saved us a lot of investment in the verification process, so the first generation of data integration platform was completed in a very short period of time.
With the launch and maturity of the first-generation data integration platform, it has well supported the company’s data integration business needs and obtained significant benefits. So far, there are about 4,000 tasks on the platform, running more than 6,000 task instances per day and synchronizing about 8.2 billion pieces of data totaling 124 TERabytes.
With the help of the platform, the data access process is greatly simplified, providing users with the ability to solve their data integration requirements by themselves. In addition, the platform can be supplemented with necessary regulatory constraints and security reviews at key process nodes, which not only improves the management level, but also significantly improves overall security and data quality.
With the elastic capabilities of Yarn and K8s, the scaling capability of integration tasks has also been greatly improved. Of course, as the first generation of systems to solve the zero-to-one problem, there are bound to be a number of problems. Such as:
-
The high scheduling delay inherent in Sqoop’s MapReduce mode
-
Data skew caused by uneven service data distribution
-
The inactivity of the community resulted in some issues that could not be solved for a long time
-
Poor scalability and manageability due to poor design of Sqoop code.
Three, turn to Flink
In contrast to Sqoop, Flink, which supports Kafka message to HDFS data integration tasks, has gained more trust for its excellent reliability and flexible customizability. Based on the confidence built for Flink by the task of streaming data integration, we began to attempt to fully turn to Flink to build the next generation of data integration platform.
Although Flink was the best candidate for this evolution, we did a second look at the technology options available on the market at the time. This time we compared the Apache NIFI project to Flink in many ways, from a functional perspective:
-
Apache NIFI is very powerful and fully covers our current data integration needs. But just because it is so powerful and self-contained, it also creates a high barrier to integration. In addition, the inability to utilize existing Yarn and K8s resource pools may incur additional resource pool construction and maintenance costs.
-
In contrast, Flink has a very active and open community, and has a very rich data source support at the time of the project. It can be expected that its data source coverage will be more comprehensive in the future. Moreover, as a general computing engine, Flink has a powerful and easy-to-use API design, on this basis, it is very easy to carry out secondary development, so its advantages in scalability are also very prominent.
Finally, based on our recognition of the goal of batch streaming integration, it is also an attractive goal to unify the technology stack of big data computing engine in Zhihu in the future.
Based on these considerations, we chose Flink to fully replace Sqoop in this iteration, fully realized the functions of Sqoop and rebuilt a new integration platform based on Flink.
As shown in the figure below, the orange part is the part that changed in this iteration. In addition to Flink as the protagonist, we also developed data integration functions of TiDB, Redis and Zetta storage systems during this iteration. On the messaging side, Pulsar is supported directly by the community. By the time we started development, Flink had evolved to a more mature stage, with native support built into Hive, and the migration process was smooth without too many technical difficulties.
The migration of Flink has brought us a lot of benefits.
1. First of all, in terms of maintainability, it has a very significant improvement over Sqoop. As shown below, on the left is the old Sqoop task definition, with a bunch of unstructured, error-prone raw commands. Flink uses SQL to define a source table and a target table and write commands to define tasks. Tasks are much more understandable and debugable than before, becoming patterns that end users can understand. Many problems no longer need the cooperation of platform developers troubleshooting, users can help solve many common task exceptions.
2. In terms of performance, we also made many targeted optimizations.
2.1 Scheduling Policies
The first is the optimization of scheduling strategy. In the first-generation integration platform, we only use Flink to synchronize streaming data, so the task scheduling completely uses Per Job. At present, the platform supports mixed scheduling modes of Session and Per Job. Therefore, streaming tasks that access data from the message system continue to run in per-job mode, while batch synchronization tasks reuse clusters in Session mode to avoid time-consuming cluster startup and improve synchronization efficiency.
Of course, using Session clusters in such scenarios also presents a number of challenges, such as changing resource requirements as workloads change with task submissions. So we built automatic scaling mechanisms to help Session clusters cope with changing loads. In addition, in order to simplify the billing mechanism and isolate risks, we also created private Session clusters for different lines of business for data integration tasks of service corresponding lines of business.
2.2 database
In terms of relational databases, we use the common JDBC approach for MySQL data synchronization, but this approach also has some inherent problems that are difficult to solve.
-
For example, data skew is caused by uneven spatial distribution of service data in the primary key dimension.
-
Another example is the waste of resources and administrative costs associated with building dedicated synchronous slave libraries to isolate online and offline workloads.
-
In addition, due to the large number of MySQL instances with different specifications, it is very difficult to reasonably coordinate the instances of multiple concurrent tasks and the host where the instances are located, and to carry out reasonable speed control.
By contrast, consider the trend of migrating data from MySQL to TiDB across the board. We developed the native TiDB Flink Connector to take full advantage of TiDB architecture.
-
First, the region-level load balancing policy ensures that synchronization tasks can be split based on region granularity to avoid data skew for any table structure and data distribution.
-
Secondly, a Follower copy can be uniformly placed in the offline data center by setting a copy placement policy. The ability of Follower Read is then utilized to isolate the load of online transactions and data extraction while maintaining the same number of target copies without additional resource costs.
-
Finally, we also introduced distributed data submission to improve the throughput of data writing.
**3. Finally, it provides data integration capability for Redis widely used within Zhihu. ** The Flink community already has a Redis connector, but it currently only has write capability and is difficult to flexibly customize the key used for writing. Therefore, we redeveloped a Redis connector based on our own requirements, supporting both Redis as Source and Sink.
In order to avoid the impact of data extraction process on online transactions, we adopted Redis native master/slave mechanism to obtain and parse RDB file extraction data in the data reading path, and obtained a single instance of data extraction throughput of about 150MB per second. Moreover, thanks to the access to the metadata of the internal storage system, we can not only support the data extraction of Redis cluster in sharding mode, but also only select the slave node of each sharding as the data extraction source, avoiding the pressure of extraction on the master node.
This comprehensive shift to Flink’s evolution has solved many problems of the previous generation of data integration platform and gained significant benefits.
-
From the perspective of throughput, Flink instead of MR mode reduces the delay of the whole scheduling from minute level to about 10 seconds. With the same amount of data and the same amount of Flink resources, TiDB native Connector can throughput four times faster than JDBC.
-
From a functional point of view, the new platform can not only support the data integration task of database and table natively, but also avoid the problem of data skew in a business-independent way.
-
In terms of data source support capabilities, we have received support from TiDB, Zetta, Redis and Pulsar at a very low cost. Moreover, as Flink’s ecosystem becomes more and more complete, there will definitely be more out of the box connectors for us to use in the future.
-
In terms of cost, we have gained significant benefits in terms of cost and management from the improvement of resource efficiency brought by offline MySQL nodes and unified use of K8s resource pool.
Flink is the future
Looking back, the input-output ratio of this full Flink evolution is very high, which further strengthens our confidence in “Flink is the future”. At present, in addition to data integration scenarios, Flink has also been applied in timeliness analysis of search Query, data processing of commercial advertisement clicks and real-time data warehouse of key business indicators.
In the future, we hope to further expand the application scenario of Flink in Zhihu and build a more comprehensive real-time data warehouse and systematic online machine learning platform. We would like to implement batch streaming as a whole, so that large batch tasks of report class and ETL class can also be implemented on Flink platform.
Based on the construction mode of Zhihu’s big data system and the overall resource input, it is a very suitable choice for Zhihu to consolidate the technology stack to Flink in the future. As users, we are looking forward to witnessing the realization of Flink batch stream integration in the future. At the same time, as members of the community, we also hope to contribute to this goal in our own way.