Brief introduction: This topic introduces how Ali Cloud uses Hudi and OSS object storage to build Lakehouse. We shared what Lakehouse is, how the OLAP team of Alicloud database built Lakehouse, and also introduced the problems and challenges encountered during the construction of Lakehouse, and how to solve these problems and challenges.

This paper PPT download link: * * * * developer.aliyun.com/topic/downl… – AliYun builds Lakehouse practice based on Hudi. PDF

I. Data Lake and Lakehouse

At the developer Conference in 2021, one of our researchers shared a topic that mentioned a lot of data. The main point is that at this stage of the development of the industry, the expansion of data is very severe and the growth rate is very terrible. Whether it is data scale or real-time production processing, to intelligent production processing, and data acceleration on the cloud of the cloud process.

These figures come from Gartner and IDC, the most authoritative analysis reports in the industry. This means that we have great opportunities and challenges in the field of data, especially in the field of analytics.

In terms of massive data, we will face many challenges if we want to truly do a good job in the mining and use of data value. First, the existing architecture will gradually migrate to the cloud architecture. The second is the amount of data; The third is Serverless pay by the amount, slowly becoming the default option from the trial option; Fourth, there are diversified applications and heterogeneous data sources. I believe that everyone who has come into contact with cloud knows that no matter which cloud manufacturer has a variety of cloud services available, especially the number of data services. At this time, a large number of data sources will certainly bring a problem: it is very difficult to analyze, especially when we want to do association analysis, how to connect heterogeneous data sources is a big problem. Secondly, differentiated data formats. Usually, we choose convenient and simple formats, such as CSV and Json formats, for data writing meetings. However, these formats are often very inefficient for analysis, especially when data reaches TB or PB level, it is impossible to analyze at all. At this point Parquet, ORC and other analysis oriented column storage formats are derived. Of course, it also includes link security and differentiated groups, etc. In the process of data expansion, a lot of analysis difficulties are added.

In real customer scenarios, a lot of data is already in the cloud and “in the lake.” What is a lake? Our definition and understanding of lake is more like AWS S3 or Ali Cloud OSS object storage, which is a simple and easy to use API form that can store a variety of differentiated data formats, with unlimited capacity, pay-as-you-go and many other benefits. In the past, it was very troublesome to do analysis based on lake. Most of the time, we had to do T+1 warehouse construction and various cloud service delivery. Sometimes if the data format is not correct, people need to do ETL. If the data is already in the lake, we need to do meta-information discovery and analysis, etc. The whole operation and maintenance link is very complicated and there are many problems. Here are some of the actual offline data lake problems that online customers face, some of which are high priority, some of which are low priority, and all in all, a lot of problems.

Databricks has been shifting its focus from Spark to Lakehouse since about 19. They published two papers that provide theoretical definitions of how data lakes can be accessed uniformly and better.

Based on Lakehouse’s new concept, what we want to do is to shield all kinds of differences in format and provide unified interface and more simplified data access and data analysis capabilities for different applications. Architecture theory realizes data warehouse, data lake and Lakehouse step by step evolution.

His two papers explain many new concepts: first, how to design and implement MVCC so that offline data warehouse can also have database-like MVCC capability, so as to meet most of the needs of batch transactions; Second, provide different storage modes, can adapt to different read and write Workload; Third, provide some near-real time write and merge capabilities to provide link capabilities for data volumes. In short, his idea can better solve the problem of offline data analysis.

Three products are relatively popular in the industry. Delta Lake, Databricks’ own data Lake management protocol, is the first. The second is Iceberg, which is also an open-source project of Apache. The third is Hudi, which was originally developed internally by Uber and later opened as an open-source project (Hive ACID was widely used in the early days). At present, these three products can be adapted to the underlying lake storage because of the HDFS API, and OSS can be adapted to the HDFS storage interface. Due to the similar core principles, the three products are getting closer to each other in all aspects of ability, and only with the theoretical support of the paper can we have the direction to practice.

For us, WE chose Hudi at that time because of its product maturity and database-oriented data into the lake, which met our business needs of CDC in the database team.

The early definition of Hudi was Hadoop Updates anD Incrementals, followed by the concept of Update, Delete, anD Insert for Hadoop. The core logic was transaction versioning, state machine control, anD asynchronous execution, which mimics the logic of the entire MVCC. Provides incremental management of internal storage files such as Parquet, ORC, and other object lists to achieve efficient storage read and write. It is similar to Databricks’ concept of Lakehouse, as is Iceberg, which is gradually improving in this direction.

The official Hudi website provides an external framework like this. When we did technical selection and research before, we found that many peers had also made full use of Hudi for the scheme selection of data into the lake and offline data management. First, because the product is relatively mature; Second, it meets our needs at CDC; Third, Delta Lake has an open source version, an internally optimized version, and only the open source version, which we don’t think necessarily presents the best. Iceberg started late. Compared with the other two products, it was not considered at the early stage. Because we were both Java teams and had our own Spark product, Hudi was a good fit for our ability to support data into the lake with our own Runtime, so we chose Hudi.

Of course, we have been paying attention to the development of these three products. Later, StarLake, an open source project in China, did similar things. Each product is making progress. In the long run, the capabilities are basically aligned, and I think they will gradually match the capabilities defined in the paper.

“To open source Hudi the column type, version format as the foundation, the heterogeneous data increment, low latency, into the lake and stored in the object store open, low cost, and in the process to realize the data layout optimization, meta information evolution ability, finally realizes the offline data unified management, indiscriminate support ability of calculation and analysis above, this is the overall scheme.” This is our understanding of Lakehouse and where we’re going with technology.

Second, Aliyun Lakehouse practice

The following is a brief introduction of ali Cloud Lakehouse technology exploration and specific practice. First of all, I would like to introduce the concept of “integration of database, warehouse and lake” strategy that ali Cloud database team has been proposing in recent years.

We all know that database products are divided into four levels: one is DB; Second, NewSQL/NoSQL products; Three is the number of warehouse products; Fourth, lake data products. The higher the value density of data is, the greater the data will be associated with the analysis in the form of meta tables and meta warehouses. For example, the DB data format is very simple and clear. The more you go down, the more and more huge data volume, data form is more and more complex, there are a variety of storage formats, data lake form is structured, semi-structured, unstructured, to do certain extraction, mining, data value can be really used.

The four storage directions have their own fields, but also have the appeal of association analysis, mainly to break the data island, let the data integration, so as to make the value more three-dimensional. If you only do some log analysis, such as associated region, customer source, you only use relatively simple analysis capabilities such as GroupBy or Count. For the underlying data, multiple cleaning and backflow may be required to move to the online and highly concurrent scenario for layer by layer analysis. Here not only write data directly from the lake to the library, but also to the warehouse, to NoSQL/NewSQL products, to the KV system, take advantage of online query capabilities, etc.

Conversely, these databases /NewSQL products and even the data in the warehouse will also flow down, building low-cost, large capacity storage backup, archive, reduce the storage pressure, analysis throughput pressure, and can form a powerful joint analysis capability. This is also my own understanding of the integration of database, warehouse and lake.

Just now we have talked about the development direction and positioning of the database. Now let’s look at the positioning of Lakehouse in the hierarchical data warehouse system of OLAP itself. Do several classmates than I am familiar with warehouse products, (PPT chart) is basically a layered system, at the beginning of various forms of several data warehouse or the lake system, there are all kinds of form data is stored outside our ability to understand through the Lakehouse, do take positions into the lake, and through washing, precipitation and aggregation, form the ODS or CDM layer, Here do the preliminary data aggregation and summary ability, form the concept of data mart.

We will store these data on ali Cloud on the whole OSS based on Hudi protocol and Parquet file format. We will further aggregate the initial data set into clearer and more business-oriented data set through ETL internally, and then build ETL and import it into real-time data warehouse, etc. Or these data sets are directly oriented to low-frequency interactive analysis, BI analysis, or machine learning oriented to Spark and other engines, and finally output to the whole data application, which is the overall hierarchical system.

Throughout the process, we will access a unified meta-information system. Because if each part of the system has its own terminology and retains a copy of its own meta-information, OLAP architecture is fragmented, so meta-information must be unified, as is scheduling. Tables of different data warehouse levels must be connected in series at different places, so there must be a complete and unified scheduling ability. The above is my understanding of Lakehouse’s positioning in OLAP system, which is mainly the ability to attach source layer and gather offline data.

I introduced Lakehouse’s position in the database and OLAP team, and then I will focus on how Lakehouse designs in our field. Because I have used K8s to analyze the cloud system before, I am quite clear about many concepts of K8s.

In our own design, we also try to refer to and learn the system of K8s. K8s has the oft-mentioned concept of DevOps, which is a practical paradigm. In this paradigm, you create a lot of instances, and you manage a lot of applications in those instances, and those applications are eventually scheduled atomically through PODS, and then you run some business logic in pods, and all kinds of containers.

We think Lakehouse is also a paradigm, a paradigm for working with offline data. In this case, data sets are the core concept, such as building a set of data sets for a certain scenario, a certain direction. We can define different data sets A, B, and C, which we see as an example. Choreograph various Workload workloads around this data set, such as doing DB into a lake. The Workload of analysis optimizations, such as index building, can be improved by techniques such as Z-ordering, Clustering, and Compaction. There is also the Workload of Management, such as periodically cleaning up historical data and layering hot and cold storage, because OSS provides many of these capabilities. Use them well. The bottom layer is a variety of jobs. Internally, we build offline computing capacity based on Spark. We arrange Workload before and after into small jobs, and all the atomic jobs are flexibly executed on Spark.

This is the overall technical architecture. First, there is a variety of data sources on the cloud, with a variety of workloads defined through choreography, running on our own Spark elastic computation. The core storage is based on Hudi+OSS, and we also support other HDFS systems, such as LindormDFS of Ali Cloud, internal meta information system management library, tables, columns and other meta information. Later, all control services are scheduled based on K8s. The upper layer connects computing and analysis capabilities through a native Hudi interface. This is the whole elastic architecture.

In fact, Serverless Spark is our computing base and provides job-level flexibility, because Spark also supports Spark Streaming, which enables Streaming of a Spark job in a short time. OSS and LindormDFS are chosen as storage bases to take advantage of the benefits of low cost and unlimited capacity.

In this architecture, how to connect the user’s data to achieve the data into the lake, storage, and analysis capabilities? The above is our security solution based on VPC. First of all, we are in the shared cluster mode. The user side can connect through SDK and VPDN network, and then ali Cloud internal gateway can open up the computing cluster to achieve management and scheduling. Moreover, aliyun elastic network card technology can be used to connect users’ VPC to realize data access, and realize routing and network isolation capabilities. Different users may have subnetwork segment conflicts, and the elastic network card technology can be used to connect the same network segment to the same computing cluster.

Students who have used Ali Cloud OSS know that OSS itself is the public network in Ali Cloud VPC network. It is a shared area and does not need a complex network. RDS and Kafka, on the other hand, are deployed in a user’s VPC. Multiple networks can be connected through a single network architecture. Compared with VPC network segment conflicts, the shared area does not have such problems. Second, data isolation. ENI has end-to-end restrictions. For example, VPCS have ID tags and different authorization requirements, and unauthorized users attempt to connect to VPCS.

The network architecture has been determined, how to run the implementation? In the whole design, we will take THE DSL design of K8s as an example. As mentioned above, many tasks will be defined for entering the lake. A Workload may have many small tasks. These orchestration scripts are submitted through the SDK, console, etc., and then received through the API Server and scheduled by Scheduler. The Scheduler connects to Spark’s gateway to implement task management, state management, and task distribution. Finally, internal K8s jobs are scheduled to be executed. Some full jobs run once, such as DB pull once on the line, as well as resident streaming jobs, triggered asynchronous jobs, timed asynchronous jobs and so on, different forms of the same scheduling ability, which can be extended. In the process, there are operation state continuous feedback state, gap statistics and so on. In K8s, the K8s Master assumes this role, as well as the API Server and Scheduler roles. Here we are similar, also through a master multi – slave architecture to achieve scheduling capability HA mechanism and so on.

Here, why do we divide a Workload oriented task into N different jobs? Because these tasks are completely run in one process, the water level of the whole Workload changes greatly, making it very difficult to make flexible scheduling. A full mission run is fine, but how many resources is appropriate? In many cases Spark is not flexible, especially for asynchronous tasks and scheduled tasks. However, it is difficult to predict when the next task will come after the use of asynchronous tasks and scheduled tasks. Just like the Fourier transform is required in many signal processing systems, signal processing is made easier by breaking a complex wave pattern into multiple simple wave patterns. We also have such perceptual understanding. Flexibility can be easily achieved by using different jobs to perform tasks for different roles in the Workload. For example, when a timed or temporary trigger Job is pulled temporarily, the resource consumption has nothing to do with the permanent streaming task, so the stability of the streaming task and the delay of entering the lake are not affected at all. This is the thinking behind the design, which is to make complex problems simple. Because from an elastic point of view, the simpler the waveform, the better the elasticity, the easier the prediction.

Entering the lake will involve a lot of users’ account secret information, because not all cloud products use systems such as IAM of AWS or RAM of Ali Cloud to construct fully cloud resource permission control. A lot of products still do authentication and authorization management in account secret way, including user – built system, database system and so on. In this way, users want to give us all the connection account secrets, how to manage them more safely? We have two systems based on Ali Cloud: one is KMS, which encrypts user data with hardware-level data encryption system; The second set is STS, a fully cloud-based three-party authentication capability, which realizes the secure access of user data, especially the isolation or protection of sensitive data. This is the whole system we have now.

Another problem is that different users are completely isolated by various mechanisms, but the same user has many tasks. In the Lakehouse concept, there are four levels of structure. There are multiple libraries under a data set, multiple tables under the libraries, different partitions under the tables, and different data files under the partitions. Users have sub-account systems and different jobs, so there can be interactions when manipulating data.

For example, different tasks entering the lake all want to write the same table, and the online task A has been running normally. As A result, another user has configured task B, which also needs to write into the same space, which may flush out all the data of the online task A, which is very dangerous. There are also other actions by users to delete jobs, such as deleting data from running tasks on the line, which may be accessed by other tasks but not aware of it; For example, the table can be manipulated by other cloud services, other programs in the VPC, or self-deployed services, causing data problems. Therefore, we designed a set of mechanisms. On the one hand, the mechanism of locking is implemented at the table level. If a task occupies a data write permission at the earliest, the subsequent tasks are not allowed to write again before the end of the task life cycle.

On the other hand, based on the Bucket Policy capability of OSS, the permission verification capability of different programs is constructed. Only tasks in Lakehouse are allowed to write data, while other programs are not allowed to write data, but other programs can read data. The data of the same account is originally intended for sharing, analysis and access to various application scenarios. It can be read, but it must not be polluted. We did reliability work in those areas.

Let’s talk more about the architecture, and go back to the whole picture to see how to understand the data model. We think that the whole process is centered on the behavior (because the data warehouse is still row by row, stored in the scope of the table), to build a unified lake, storage, analysis, meta-information model with row data. First, there are various data sources (either text or binary; a binlog is binary data; Or, like Kafka, which can store a variety of binaries), this data is eventually read through a variety of connectors and readers (different systems have different names for this) and mapped to rows of data. Within this row data, there are key descriptive information, such as source information, type of change, and so on, as well as a mutable set of columns. Then through a series of rules, such as filtering some data, to generate primary keys for the data, to define the version of the segment, type conversion, and so on; Finally, it is written to the lake storage through Hudi Payload encapsulation, conversion, meta-information maintenance, and file generation.

Through data maintenance such as meta information and partition in the storage, and subsequent calculation and analysis, we can seamlessly see the meta information of all the data stored in the lake and warehouse, and seamlessly connect different forms of application scenarios.

Here’s a look at our support for common forms of data source access. DB into the lake is the most common scene, on Ali Cloud, there are RDS and PolarDB products. MySQL engine, for example, usually has a master library, slave library, offline library, and maybe a master/slave access point, but it is always the same. Full synchronization and incremental synchronization are performed before DB is added to the lake. For users, DB into the lake is a clear Workload, but for the system, it is necessary to do full synchronization first, and then automatically docking incremental synchronization, data must be connected through a certain mechanism to ensure the correctness of data. The whole scheduling process obtains DB information through unified management and control services, automatically selects the instance with the least pressure from the slave library or online, synchronizes the full amount to the library, maintains the corresponding Watermark, records the time when the full amount starts, and the delay between the slave library and the master library, etc. After the full load is completed, start the increment task. Use synchronous binlog service such as DTS to do data tracing based on the previous Watermark and start the increment task. Use Upsert capability in Hudi to merge data according to user-defined PK and version according to certain logic to ensure data consistency and correctness of analysis.

On the whole Watremark maintenance needs to consider a lot, if the full amount of hang up and try again once, site should be where to start, if the incremental hang up, should not only consider the increment has been before, but also a gradual incremental maintenance site, not every time increment hang back to the start before the full amount of sites, behind the data latency were too severe. This information is maintained at the Lakehouse table level and can be transparently connected to the user during Workload run time, restart, retry, and so on.

The second is the entry of message products into the lake. We have also made some technical exploration and business attempts. Its data are not as clear as DB’s Schema. For example, in the existing Kafka service of Ali Cloud, its Schema has only two fields, Key and Value. Key describes the message Id, and Value is customized. Most of the time, it is a Json or binary string. There is a lot of logic involved in figuring out how to map to rows, such as doing some Schema inference to get the original structure. Json’s original nested format is easy to store, but it is difficult to analyze, only when flattened into a wide table analysis is convenient, so we need to do some nested leveling, format expansion and other logic, and then with the core logic mentioned above, finally achieve file writing, meta-information merge and so on. This meta-information merge means that there is an indeterminate number of columns at the source, sometimes there is a column, sometimes there is not a column for different rows. For Hudi, the meta-information needs to be maintained at the application layer. Schema Evolution in Lakehouse is Schema merging, column compatibility processing, automatic maintenance of new columns, and so on.

We have an internal Lindorm based program. Lindorm is our self-developed KV line storage compatible with HBase, Cassandra and other large and wide table interfaces. It has a lot of history files and a lot of Log data, through the internal LTS service calls, full and incremental data through the Lakehouse way to convert to column files, support analysis.

For Both Kafka and SLS systems, the concept of Partition and Shard is used. When the traffic changes greatly, the capacity needs to be automatically expanded and reduced. Therefore, the consumer side should actively perceive the change and continue consumption without affecting the correctness of data. In addition, such data are appends-only, which can make good use of Hudi small file merging capability to make downstream analysis simpler, faster and more efficient.

Customer best practices

The above is the sharing of technical exploration, and the application in customers will be introduced next. The problem of a customer of a cross-border e-commerce company before is that DB data is not easy to analyze. Currently, PolarDB and MongoDB systems are available, and they hope to upload all data to OSS in near real time for analysis. Now, the problem with FederatedAnalytics is that there is a lot of pressure on the original database to query data directly. The best way is to go into the lake and do analysis in the offline lake. The Lakehouse method is used to construct offline lake warehouse, and then the calculation and analysis are connected, or the ETL is clear, so as to avoid the impact on online data. The same architecture builds the overall data platform, so that application and analysis can blossom without affecting anything.

The difficulty for this client is that they have many libraries, tables and various application cases. We have made many optimizations on Hudi and completed more than 20 patches to contribute to the community to improve Hudi, including meta-information access and part of Schema Evolution capabilities, which have also been applied to the client.

Another customer number is Kafka log near-real time analysis. It turned out that their solution required humans to do a number of steps, including access to the lake, data management, small file merging, and so on. Through Lakehouse scheme, customer data can be connected, automatically merged into the lake, and meta information can be maintained. Customers can directly apply it, and the internal connection can be directly opened.

There is also a small issue file, in which they participate in the Clustering technology construction with the Hudi community in their scenario. Clustering is the automatic Clustering of small files into large files, which are useful for analysis. Second, during the merge process, the data can be sorted by specific columns, which can be accessed later for much better performance.

Iv. Future prospects

Finally, I would like to share our team’s thinking about the future and how Lakehouse can be applied.

First, richer data sources of lake entry. The important value of Lakehous lies in shielding various data differences and breaking data islands. In the cloud, there are various kinds of data in many systems, which have great analysis value. In the future, more data sources should be unified, and only one DB or Kafka should be supported. Customer value is not maximized. The value to users becomes more apparent when enough data is aggregated to form a large offline silo and complexity is shielded. In addition to cloud products, there are other forms of entering the lake, such as proprietary clouds, self-built systems, and self-uploading scenes. Mainly strengthen the ability to stick the source layer.

Second, lower cost and more reliable storage capacity, managed around the data life cycle. Because Ali cloud OSS has very rich billing methods, support a variety of storage (standard storage, low frequency storage, cold storage and cooler storage) and so on, billing logic in dozens of items, ordinary people do not fully understand. But for users, cost is always at the heart of design, especially building a massive offline warehouse, as the amount of data increases and the cost increases.

Before, I had contact with a client who needed to store data for 30 years. Their business was stock analysis, and they needed to climb down all the data of exchanges and securities brokers and transfer them to the big lake warehouse. Cost optimization is critical because of the 30-year analysis. The original choice of online system, save a few months can not carry, because the amount of data is too large. Analyzing data is characterized by access from cold to hot and from relatively low frequency to high frequency. Lakehouse takes advantage of these characteristics and automatically eliminates complex maintenance of which directories need cold storage and which directories need hot storage by defining rules and logic to help users go further.

Third, better analytical skills. Hudi’s ability to accelerate analytics, in addition to the aforementioned Clustering, also compacts. Clustering is a combination of small files, such as a logging scenario, which produces one file per write batch. These files are generally not very large, but the smaller the file, the more fragmented it is, the more expensive it is to access for analysis. Accessing a file requires authentication, connection, and meta-information access. Accessing a large file these processes are done only once, while accessing a small file is multiplied and very expensive. In the Append scenario, Clustering can be used to rapidly combine small files into large files to avoid linear degradation of analysis performance caused by writing and ensure efficient analysis.

In Hudi, Merge On Read tables, such as Delete and Update, are quickly written to the log file, and Merge data during subsequent reads to form a complete logical data view. The problem here is also obvious. If you have 1000 log files and need to merge 1000 times per read, the degradation of analysis capability must be severe. At this point Hudi’s ability to merge logs together regularly will occur. As mentioned above, if it is to be completely implemented in the same lake-entry operation, especially file merge, the calculation cost is very high. When doing these heavy loads, the delay of lake-entry link will be greatly affected. Therefore, asynchronous scheduling must be adopted to realize write delay guarantee. And these processes are flexible, whether it is 100 files to be closed or 10,000 files to be closed, it can be quickly elastic without affecting the delay, which is very advantageous.

Fourth, richer scenario-based applications. Personally, I think Lakehouse is still oriented to the ability of the source layer, so it can cooperate with the aggregation to a certain extent. Because of the higher level of convergence and real-time, there are more options for real-time data storehouse, DorisDB and ClickHouse, which are popular in the industry, have a great advantage in real-time high-frequency analysis. Real-time analysis based on Hudi, Lakehouse, OSS does not have many advantages, so the ability to build the source layer is the main.

It was originally a near-real-time lake-entry scenario, but some users may not have so many real-time requirements. Periodic T+1 logical warehouse building can meet the requirements. Hudi+Lakehouse capability can be used to query part of logical incremental data every day and write Hudi, maintain partitions, and implement Schema Evolution capability.

Early data volume is increasing, customers through sub-database sub-table to achieve logical split. When analyzing, it is found that there are too many libraries and tables, and the analysis and correlation are difficult. At this time, it can be analyzed by building the ability of combining multiple libraries and multiple tables.

Then there is the cross-regional convergence analysis, which has a lot of customers, especially overseas. In order to serve overseas users, some customers must have part of their business overseas, especially in the case of cross-border e-commerce, while their procurement system, storage system, logistics system and distribution system are all built in China. What should we do if we want to integrate and analyze a lot of data? First of all, OSS provides cross-domain replication, but only at the data level without any logic. Lakehouse can be used for logical layer construction, where data from different regions can be mixed together and collected in the same region to provide unified SQL join and union capabilities.

Finally, Hudi has the ability of TimeTravel and Incremental Query. At this time, Incremental ETL is built to clean different tables, so that users can use it more easily to some extent. In the future, more scene-oriented capabilities are built to make it easier for users to build and apply lake warehouses!

The original link

This article is the original content of Aliyun and shall not be reproduced without permission.