Author | Cai Shi (optional), head of the big data platform motion finishing | zhao (Flink community volunteers)
This paper mainly introduces the practical application and productization experience of SF Express in real-time data of data warehouse, DATABASE CDC and Hudi on Flink. The article is mainly divided into the following parts:
● SF Express business introduction ● Hudi on Flink ● Productization support ● Follow-up plan
1. Sf Express Business
1.1 Application of SF Express Big data
Let’s take a look at the panorama of SF Express’s big data business.
Big data platform, the basic part in the middle is the big data platform, which is built by SF Express combined with open source components. It is related to big data analysis and artificial intelligence. Sf Express has a very strong ground force, namely offline delivery boys and transport vehicles, which need TO use AI and big data analysis to assist management and improve overall efficiency.
Block chain, SF Express connects many customers and merchants. For merchants, first of all, they need to ensure that the express is credible and can do the transaction and exchange of goods. This involves basically brand merchants, and SF Express is also involved in the business of traceability and storage.
IoT, as mentioned before, because of the large number of ground troops in Shunfeng, there will be more data to be collected accordingly. Some of our packages have sensors, and the vehicles also have relevant sensors, such as the camera of the vehicle and the Courier’s bracelet (including geographical location and health status of employees, corresponding to some caring actions). At the same time, there are also some work scenarios including forklifts and sorting equipment, which require big data platforms to do some linkage, so IoT applications are relatively common.
Smart supply chain and smart logistics are more about how to use big data to assist businesses in making operational decisions. For example, we have many B-end customers. For them, how to prepare goods in each warehouse, how to coordinate and allocate each other is accomplished by intelligent logistics.
The following is part of the IOT practice:
It can be seen from the above that logistics itself has a lot of links, such as placing orders, receiving packages, sorting, land transportation and transfer, etc. The red part refers to some IoT and big data combined applications that we will do. In fact, most of them are completed based on Flink.
1.2 SF Express Big data technology matrix
The chart below is an overview of the overall architecture of SF Express’s current big data:
1. Data integration layer: The data integration layer is at the bottom. Due to the history of SF Express, it contains many data storage engines, such as Oracle, MySQL, MongoDB, etc., and some engines will continue to support it. Iot devices on the lower right are relatively new, mainly for data acquisition including ordinary text, network databases, images, audio, video, etc.
2. Data storage and calculation: The real-time part of SF Express is still Flink, Storm did not indicate it, we are currently migrating. Message-oriented middleware processing currently uses Kafka. The right-hand side of the storage structure is relatively rich, because different scenarios have different processing methods, such as data analysis requires a high performance Clickhouse; Data warehouse and offline calculation are still relatively traditional, with Hive as the main part and Spark as the main part. At present, we combine Flink and Hudi to realize offline real-time.
3, data products, we tend to lower the threshold first, so that internal development and users easier to use. It would be very expensive for internal students to master so many components. In addition, standardization will lead to high costs in communication, maintenance, operation and maintenance, so we must do some productized and standardized things.
1.3 COMPOSITION of DATA collection of SF Express
The figure above is an overview of our big data collection. Data collection currently includes microservice applications, some data are sent directly to Kafka, and some are logged. Then we built a log collection tool, similar to Flume, which is more lightweight, and can achieve no loss, no weight, and remote update, speed limit. In addition, we will also put Kafka data into HDFS via Flink, in the form of Hudi. More on this below.
1.4 SF Express data application architecture
The figure above is a simple application architecture. The data of the big data platform mentioned just now will be pushed to the OLAP analysis engine and database as required. After this part of data is pushed, it will reach the data service platform. The data service platform mainly considering the user or r&d docking database more convenient, in the past when use, internal users need to understand the use of big data components first, and now through our data services product configuration in the form of configuration change query conditions, polymerization conditions, finally the results generated a restful interface, a business system can be called directly. For example, research and development users need to do a search, only need to pay attention to the input, the input, the middle process does not need to understand, so that it can maximize the technical threshold down, use will also be more efficient and simple.
In the middle part, we make the gateway based on Kong, in which we can add a variety of common capabilities, including monitoring, traffic limiting, caching, etc.
Graphql, on the right, is an open source component of Facebook. The front-end user’s requirements often change and the back-end interface needs to be adjusted accordingly, which is supported by Graphql. There are actually two things: Apollo, graphql_Java, two lines. Apollo is for front-end developers, and node_JS is used for the control layer. Graphql_Java is intended for users on the back end and mainly provides interfaces.
2, Hudi on Flink
2.1 introduce Hudi
Next, we mainly introduce the application practice of Hudi on Flink in SF Express. Hudi’s core advantages are mainly divided into two parts:
● First, Hudi provides a solution for updating and deleting in Hadoop, so its core is the ability to incrementally update and delete simultaneously. The advantage of incremental update is that domestic and international privacy data protection requirements are high. For example, it is difficult to delete a user’s data in Hive, which is equivalent to data cleaning again. Using Hudi, you can quickly grab by primary key and remove it.
● Plus, time wandering. We used to have a lot of applications that needed to do quasi-real-time computing. If you want to figure out what the increment is in half an hour, what the point of change is, you have to take the whole day’s data and filter it out. Hudi provides the ability of time roaming. It only needs the syntax similar to SQL to quickly pull out all the increment, and then when the background application is used, it can directly update the business according to the data inside. This is the most important ability of Hudi time roaming.
Hudi has two ways of writing:
● Copy on write. ◎ Copy on Write allows you to rewrite the history of the file where the update record is located, rewrite it and re-record the incremental part, which is equivalent to recording the historical status. The only downside is that write performance is a bit weak, but read performance is strong and is similar to normal Hive use. This is also a virtue of Hudi itself. Real time is slightly lower, partly depending on how often the files are written are merged. However, batch writing does not affect how much performance, so itself is also batch to write. For example, if you write every few minutes, this does not incur a high performance cost. This is copy on write.
◎ Merge on read = Merge on read = Merge on read = Merge on read = Merge log on read = Merge on read = Merge log on read = Merge on read = Merge log on read = Merge log on read = Merge log on read = Merge log on read = Merge log on read = Merge log on read = Merge log on read = Merge log on read = Merge log on read = Merge log on read = Merge log on read = Merge log on read = Merge log on read = Merge log on read = Merge log on read The advantage is that the data are real-time during the query, but because there are really many query tasks, it is equivalent to taking out the two parts of data and doing a merger every time when the query, so it will also cause loss.
The above is a brief introduction to Hudi.
2.2 Hudi on Flink components – Real-time database
This is how we put the data into CDC in real time. The CDC of a database is basically only available at the library level and granularity. The previous source support is definitely library granularity, with two processes in between:
● One part is DML, which will have filtering. When there are 100 tables in the library, most of the time some tables are not needed, we will directly filter out this part, filtering is mainly through productization to get through it.
● The other part is DDl, which updates the schema in real time. For example, if the database table field is added or changed, or if a table is added or changed, this will open the data through channel in the real-time program. As long as there is any change, a new version will be generated, and then the metadata information will be recorded in the channel, and at the same time, it will be wrapped into the Binlog Kafka Sink record. Each line is typed with the corresponding version number. In this way, the use of the following can directly correspond to the record, the use of very convenient, there will be no error.
2.3 Hudi on Flink components – Data warehouse real-time
In this section, we share the process of real-time data warehouse. Our goal is to make the data in Kafka available to offline data warehouse, including many users who do quasi-real-time computing. Hudi on Flink is the solution we tried. In the past, Hudi also made Hudi on Spark solution, which is officially recommended. In fact, it is like maintaining one more component. However, we still hope that all real-time things can be completed by Flink in the general direction. In this part, it is really to put it on the ground and apply it in production.
In fact, the whole process, such as real-time table data, is divided into two parts. One part of the data is initialized, and the data will be pulled in batches again when it is started. Flink Batch is used to do this, and the community itself also provides this capability. In addition, Hudi also has the ability to Hudi existing Hive tables, which is a new feature of Hudi. This part will be selected again by Flink Batch. Of course, there are also stocks. For some tables with stocks, they can be directly converted by the stock table, and then initialized by Flink Batch.
Add DB connect to Kafka. Add DB connect to Kafka. Add DB connect to Kafka. At the same time, the checkpoint mechanism of Flink itself (the overall checkpoint frequency of Flink itself can be controlled) is used to carry out the snapshot process. We can control what we do, so we can update all upSERT operations that Hudi needs to do online in the form of checkpoint, and finally form real-time data in Hudi.
2.4 Hudi data warehouse width meter scheme
It is relatively easy to throw Kafka data directly into Hudi, but the real difficulty lies in the wide table. For the whole Hudi, the wide table involves many multidimensional tables. When many multidimensional tables or fact tables are updated, multiple fact tables will make an association. But not every fact table can catch the true primary key of the wide table, so Hudi cannot do this update. So how to make the wide table real-time data is a difficult problem.
The picture above shows SF Express’s wide meter scheme.
● The first layer, for ODS, can be directly connected to Kafka with Hudi on Flink framework.
● Layer 2, DWD, there are also two ways: One is to use Flink SQL to first complete the real-time Kafka wide table, but this method will be a little higher cost, equivalent to the introduction of Kafka again, the whole data link becomes longer, if you really need to use the real-time wide table can be a small part to push, but if there is no pure real-time data demand, There is no need to do DWD’s real-time Kafka wide meter. In addition, how to implement DWD real-time in offline layer without DWD real-time Kafka wide meter? Here are a few steps, first create a dimension table UDF to do table association, is also the most convenient way. Secondly, two real-time tables can be directly associated with join, but the association may not be possible.
Of course, to do dimensional table association, it involves the mapping of foreign key primary key. The foreign key primary key mapping is designed to allow us to quickly find where the primary key is when another fact table is updated, namely the mapping of the foreign key primary key. In addition, the primary key index, the primary key index is also related to the mapping of the foreign key primary key. As for the foreign key primary key mapping, it is equivalent to creating a new table primary key index fetch, so that the incremental update Hudi and the original ODS layer is basically the same, this is the real-time processing process of the wide table. The following figure shows an example of a wide meter on the waybill.
3. Productization support
The above has analyzed the current business architecture of SF Express from the technical level. The following will share some support work we have done in the transition.
3.1 Data through train
The figure above is our data express, which allows users to operate in the product by themselves without writing codes, and can achieve a fast and simple application with low threshold. For example, it only takes about 1 minute to access configuration data. The whole process is to configure the data in the product to the database. Our offline table, data warehouse and data analysis can be directly and quickly used.
In addition, after data access, need to have data management ability. The figure above is a simple situation of the data management ability test environment. We need to enable users to manage relevant data. First, who uses it, then what fields it involves, what specific content it has, and how the blood relationship inside it is.
3.2 Use of real-time data
The figure above is the SDK of binlog. In fact, binlog, the avro format, has a certain threshold for users to use. But there are some coded users, and for those users we provide specific SDKS, so it’s easy to actually use them in the SDK. What looks like JSON on the left is actually avro format. The one on the right is in Java, and this is a tool for developing quick applications at the code level.
We’ve also done some simplification on the platform, starting with a section on drag and drop, which encapsulates components that users can drag and drop to quickly fulfill their needs. When the product went live, many users who had no prior experience in real-time computing, or even offline development, were able to do real-time data development.
Flink itself provides a lot of metrics, and users also have a lot of metrics. We wanted to provide users with an efficient solution that collects all the metrics and allows them to quickly apply them.
There are also several work done in the monitoring. One is the crawler scheme to realize an Akka client. Flink itself is the akka framework, and each Jobmannager has akka services and interfaces. You can get specific metrics in the form of AKka’s API. After this part is collected, it is sent to Kafka and finally saved to TDengine and Grafana for users. Grafana will also be integrated into our real-time computing platform offering, allowing for data acquisition without the need to restart the user’s task in the face of inventory.
In the incremental case, however, some Metric needs to be added, such as CPU usage, memory usage, and so on. This part is met by Reporter scheme, which is also the main scheme promoted by the community. The principle of Reporte R solution is to develop plug-ins in The Metrics Reporter of Flink, and then send them to the gateway. The gateway is to avoid the problem of too many Kafka clients, so a gateway is built in the middle. This is Flink’s task monitoring situation.
4. Follow-up plan
The above has shared our internal implementation and practical application process. What will we do next?
4.1 Elastic Calculation
First of all, the elastic calculation. Currently, for monitoring tasks, users apply for far more resources than they actually need, resulting in a serious waste of resources, including memory. When dealing with similar situations, we use the framework Metrics Monitor extended by Flink. Combined with the collected Metrics, we can timely adjust to achieve resource expansion or concurrent capacity expansion when the overall utilization rate is too low or too high.
4.2 Flink Replacing Hive Evolution
As mentioned above, we have a lot of Hive tasks, including Spark tasks that need to be replaced, but how do we do it?
First, we replaced it with Flink. Since it was difficult to force or platform automatic recommendation, we made some compromises. For example, when data is written to a table in Hive, the table is replaced by Hiveserver after SQL parsing. One is a normal table, and the execution is written to Hive. In addition, it will also replace the written table with another table, and then execute it in the form of Flink at the same time, but it will generate additional resource consumption, about two tables, and need to automatically calculate whether the two tables are consistent. If the consistent test is stable, it can be replaced with a computational framework.
Most tasks are compatible and replaceable, but there are also a small part of incompatible situations, which can be manually handled to achieve the unification of the whole technology, which is needed to be completed later.
4.3 Batch Stream Integration
The figure above shows the process of batch streaming integration. Batch streaming integration has been implemented in metadata management and permission management.
In addition to this, we combine the substitution process just mentioned, the above picture is the COMPATIBILITY test of SQL. Because these are all done, in fact, batch integration can be synchronized to do, equivalent to the same interface, add a parameter, you can realize the rapid switch of the bottom engine of the stream batch processing, help the whole data development can be consistent, so batch integration is also the need to try later.
The figure above is actually the final form of our integration of the entire framework. First, there is a layer of IDE that all users can use. Then the following various basic functions support, including automatic completion of SQL syntax parsing function support, and then some resource management, scheduling management and knowledge management, these are also used to aid development. The next layer is the computing engine, which should be isolated from users so that users do not have to pay attention to the implementation and use of the underlying technology. This is what we will continue to do in the future.