Qi Zhi @pingcap

With the rapid development of the Internet, there will be more and more types of business and more and more business data. When it reaches a certain scale, the traditional data storage structure gradually cannot meet the needs of enterprises, and real-time data warehouse becomes a necessary basic service. Take dimension table Join as an example. Data is stored in the form of a typical table in the service data source. A large number of Join operations are required during analysis, which reduces performance. If the Join can be streamed during the data cleaning import process, there is no need to Join again during analysis, thus improving query performance.

With real-time data warehouse, enterprises can realize real-time OLAP analysis, real-time data kanban, real-time business monitoring, real-time data interface services and other purposes. However, when thinking of real-time data warehouse, many people’s first impression is that the architecture is complex, difficult to operate and maintain. Thanks to the new Flink SQL support, as well as TiDB HTAP features, we explored an efficient and easy to use Flink+TiDB real-time data warehouse solution.

This paper will first introduce the concept of real-time data storehouse, then introduce the architecture and advantages of Flink+TiDB real-time data storehouse, then give some user scenarios already in use, and finally give a Demo in docker-compose environment for readers to try.

The concept of real time data warehouse

The concept of a data warehouse, introduced in the 1990s by Bill Inmon, refers to a topic-oriented, integrated, relatively stable collection of historical changes to support management decisions. Data warehouses at the time collected data from data sources through message queues and performed daily or weekly calculations for reporting purposes, also known as offline data warehouses.

In the 21st century, with the development of computing technology and the improvement of overall computing power, the main body of decision-making has gradually changed from manual control to computer algorithm, and the demand for real-time recommendation, real-time monitoring and analysis has emerged, and the corresponding decision-making cycle time has gradually changed from days to seconds. In these scenarios, real-time data warehouse emerged.

Currently, real-time data warehouse mainly has three architectures: Lambda architecture, Kappa architecture and real-time OLAP variant architecture:

  1. Lambda architecture refers to the superposition of real-time data warehouse on the basis of offline data warehouse, the use of streaming engine to process real-time data, and finally the offline and online results are unified for use.

  1. Kappa architecture removes the part of offline data warehouse and uses real-time data production. This architecture unifies computing engines and reduces development costs.

  1. As real-time OLAP technology advances, a new real-time architecture has been proposed, tentatively called the “real-time OLAP variant.” In simple terms, part of the computational pressure is transferred from the streaming computing engine to the real-time OLAP analysis engine for more flexible real-time warehouse calculations.

In summary, for real-time warehouses, the Lambda architecture requires two engines to maintain both stream and batch, which is more expensive to develop than the other two. The real-time OLAP variant architecture can perform more flexible computations than the Kappa architecture, but relies on additional real-time OLAP computing power resources. Next, we will introduce Flink + TiDB real-time data warehouse scheme, which belongs to real-time OLAP variant architecture.

For a more detailed explanation of the real-time data warehouse and the comparison of these architectures, interested readers can refer to this article in the Flink Chinese community.

Flink + TiDB real-time data warehouse

Flink is a big data computing engine with low latency, high throughput and unified streaming batch. It is widely used for real-time computing in high real-time scenarios and has important features such as support for exactly-once.

With the integration of TiFlash, TiDB has become a true HTAP (Online Transaction processing OLTP + Online analysis processing OLAP) database. In other words, in the real-time data warehouse architecture, TiDB can be used as the business database of the data source for business query processing. It can also be used as a real-time OLAP engine to calculate analytical scenarios.

Combining the characteristics of Flink and TiDB, the advantages of Flink + TiDB scheme are also reflected: first, the speed is guaranteed, and both can increase the computing power through horizontal expansion nodes; Second, the cost of learning and configuration is relatively low, as TiDB is compatible with MySQL 5.7, and the latest version of Flink can write submission tasks entirely through Flink SQL and the powerful connector, saving users the cost of learning.

For Flink + TiDB real-time data warehouse, the following are several commonly used building prototypes, which can be used to meet different needs, and can also be extended in actual use.

Use MySQL as the data source

By using the official Flink-connector-mysl-CDC provided by Ververica, Flink can not only collect mysql binlog to generate dynamic tables as a collection layer, but also realize streaming computing as a streaming computing layer, such as streaming Join and pre-aggregation. Finally, Flink writes the calculated data to TiDB through the JDBC connector.

The advantage of this architecture is that it is very simple and convenient. In the case that MySQL and TiDB have corresponding databases and tables ready, you can write only Flink SQL to complete the registration and submission of tasks. You can try this architecture out in the “Try it in Docker-compose” section at the end of this article.

Connect Flink to Kafka

If the data is already stored in Kafka from other sources, Flink Kafka Connector can be easily used to enable Flink to obtain data from Kafka.

If you want to store MySQL or other data source change logs in Kafka for subsequent processing by Flink, Canal or Debezium is recommended to collect data source change logs. Because Flink 1.11 natively supports parsing Changelog in both tool formats, there is no need to implement an additional parser.

Use TiDB as the data source

TiCDC is a TiDB incremental data synchronization tool implemented by pulling the TiKV change log, which can be used to output TiDB change data to the message queue, which can be extracted by Flink.

In version 4.0.7, you can connect to Flink through TiCDC Open Protocol. In a later version, TiCDC will support direct output in canal-JSON form for use by Flink.

Case and Practice

The last section covered some of the basic architectures, while explorations in practice tend to be more complex and interesting, and this section introduces some representative and illuminating user cases.

The little red book

Xiaohongshu is a lifestyle platform for young people. Users can record their lives through short videos, pictures and texts, share their lifestyles and interact with each other based on their interests. As of October 2019, xiaohongshu has over 100 million monthly active users and continues to grow rapidly.

In xiaohongshu’s business architecture, Flink’s data source and data summary are TiDB to achieve the effect similar to “materialized view” :

  1. The online business table in the upper left corner performs normal OLTP tasks.
  2. The TiCDC cluster below extracts real-time change data from TiDB and passes it to Kafka in the form of Changelog.
  3. Flink reads Changelog in Kafka and performs calculations, such as assembling wide tables or aggregate tables.
  4. Flink wrote the results back into the wide table of TiDB for subsequent analysis.

The whole process forms a closed loop of TiDB, transfers the Join work of subsequent analysis tasks to Flink, and relieves the pressure through streaming calculation. At present, this program has supported the content review of Xiaohongshu, note label recommendation, growth audit and other businesses, which have experienced the online business test of large throughput and continued to run stably.

Shell gold dress

Shell Gold has been deeply ploughing the residential scene for many years and accumulated rich big data of Chinese real estate. Driven by fintech, Shell Gold uses AI algorithms to efficiently apply multidimensional mass data to improve product experience and provide users with rich and customized financial services.

In the shell data group data service, Flink real-time computation is used for a typical dimension table Join:

  1. First, use Syncer (a lightweight synchronization tool for MySQL to TiDB) to collect dimension table data from the business data source and synchronize it to TiDB.
  2. The flow table data on the business data source is then stored in the Kafka message queue through the Canal collection binlog.
  3. Flink reads the change log of the flow table in Kafka, tries to do a stream Join, and whenever it needs data from the dimension table, it looks it up in TiDB.
  4. Finally, Flink writes the assembled wide table to TiDB for data analysis services.

With the above structure, the main table in the data service can be joined in real time, and then the server only needs to query the single table. This system has penetrated into each core business system in Shell Gold, and data acquisition across the system is unified through data group data service, which saves the development work of business system development API and memory aggregation data code.

Wisdom buds

PatSnap is a global patent database that combines 130 million patent data and 170 million chemical structure data from 116 countries since 1790. Search, browse and translate patents to generate Insights patent analysis reports for patent value analysis, citation analysis, legal search and view 3D patent maps.

The Segment + Redshift architecture is replaced by Flink + TiDB.

The original Segment + Redshift architecture only builds ODS layer, data writing rules and schema are not controlled. In addition, it is necessary to write complex ETL for ODS to calculate various indicators according to business requirements to complete the upper requirements. In Redshift, there is a large amount of data dropped in the database, and the calculation is slow (T+1 aging), and the external service performance is affected.

After replacing the real-time data warehouse architecture based on Kinesis + Flink + TiDB, ODS layer is no longer needed. As a pre-computing unit, Flink directly constructs Flink Job ETL from the business, completely controls the landing rules and defines the schema. That is, only the indicators concerned by the business are cleaned and written into TiDB for subsequent analysis and query, and the amount of data written is greatly reduced. The DWD/DWS/ADS layer is constructed on TiDB according to user/tenant, region, business action and other concerned indicators, combined with time Windows of different granularity such as minute, hour, day, etc., to directly serve the statistics and list requirements of business. The upper application can directly use the constructed data and obtain the real-time capability of second level.

User experience: After using the new architecture, the amount of incoming data, incoming rules and computational complexity are greatly reduced. Data has been processed in Flink Job according to service requirements and written into TiDB. T+1 ETL based on the full ODS layer of Redshift is no longer required. The real-time data warehouse built based on TiDB is greatly simplified in architecture through reasonable data layering, and the development and maintenance become simpler. In the data query, update, write performance has been greatly improved; No longer need to wait for a process like Redshift precompilation when meeting different adhoc analysis needs; Expansion is convenient, simple and easy to develop. At present, this architecture is being launched, and it is used to analyze and track user behavior inside Wisebud, and summarize the functions of company operation market, user behavior analysis, tenant behavior analysis and so on.

Netease entertainment each

Netease officially established its online game Division in 2001. After nearly 20 years of development, netease has become one of the top seven game companies in the world. Netease ranks second in App Annie’s “top 52 Global Publishers 2020” list.

In the application architecture of netease Interactive Entertainment billing group, on the one hand, Flink is used to complete the real-time writing of business data source to TiDB; On the other hand, TiDB is used as the analysis data source to perform real-time flow calculation in the subsequent Flink cluster and generate analysis reports. In addition, netease Interactive entertainment has now internally developed the Flink Job Management platform to manage the entire life cycle of the job.

zhihu

Zhihu is a comprehensive Chinese Internet content platform, with “let everyone get reliable solutions efficiently” as the brand mission and the north star. As of January 2019, Zhihu had more than 220 million users and 130 million answers.

As a partner of PingCAP and an in-depth user of Flink, Zhihu developed a set of TiDB and Flink interactive tool and contributed it to the open source community: PingCap-Incubator /TiBigData, which mainly includes the following functions:

  1. TiDB acts as Flink Source Connector for batch data synchronization.
  2. As Flink Sink Connector, TiDB is implemented based on JDBC.
  3. Flink TiDB Catalog, you can use TiDB tables directly in Flink SQL without creating them again.

Try this in Docker-compose

For readers to better understand, we are at github.com/LittleFall/… Docker-compose provides a mysql-flink-tiDB test environment based on docker-compose.

Flink TiDB real-time data warehouse Slides provides a simple tutorial on this scenario, including concept explanations, code examples, simple principles, and precautions. Examples include:

  1. Flink SQL simple try
  2. Use Flink to import data from MySQL to TiDB
  3. Shuangliu Join
  4. Dimension table Join

After docker-compose is started, you can write and submit Flink tasks via the Flink SQL Client, and observe the task execution via localhost:8081.

If you are interested or confused about Flink+TiDB real-time data warehouse scheme, or have accumulated experience you would like to share in the process of exploration and practice, Welcome to TiDB community (e.g. AskTUG), Flink community (e.g. Flink Chinese mail) or my email ([email protected]).

Refer to the reading

Based on the typical ETL scenario implementation scheme of Flink, Flink Chinese community discussion on the concept of real-time data warehouse and Join on stream

How We Use a scale-out HTAP Database for Real-Time Analytics and Complex Queries

How We Build an HTAP Database That Simplifies Your Data Platform, TiDB HTAP architecture and application on Data Platform

TiDB: A Raft-based HTAP Database

Flink SQL CDC online! We summarized 13 production practice experience, Flink Chinese community, about Flink SQL CDC operation and maintenance production experience