This article was written by the DLF team at AliYun Data Lake Construction (DLF) and Databricks Data Insight Team at Aliyun Data Lake Construction (DLF) and Databricks Data Insight (DDI) team to help you gain a deeper understanding of aliyun Data Lake Construction (DLF) and Databricks Data Insight (DDI).

Author Chen Xinwei (Xi Kang), Ali Cloud computing platform division technology expert Feng Jialiang (highlight), Ali cloud computing platform division technology research and development

background

With the continuous development of the data era, the amount of data increases explosively and the forms of data become more diverse. The problems of traditional data warehouse model, such as high cost, slow response and few formats, are increasingly prominent. The result is a data lake with lower cost, richer data forms, and more flexible analysis and calculation.

As a centralized data storage warehouse, data lake supports diversified data types, including structured, semi-structured and unstructured data. Data sources include database data, binglog incremental data, log data and existing storage data on several warehouses. Lake to these different data sources and different formats of data centralized storage management in cost-effective storage such as OSS object storage, and provide uniform data directory, support a variety of computational analysis way, effectively solves the problems in the enterprise data island, at the same time greatly reduce the cost of enterprise storage and use of the data.

Data lake architecture and key technologies

The enterprise-level data lake architecture is as follows:

Data lake storage and format

Data lake storage mainly takes cloud object storage as the main medium, which has the advantages of low cost, high stability, high scalability and so on.

For the data Lake, we can use acid-supported data Lake storage formats such as Delta Lake, Hudi, and Iceberg. These data lakes have their own meta management capabilities and can support operations such as Update and Delete, solving the problem of real-time data Update in big data scenarios in the way of batch flow. In the current scenario, we focus on the core capabilities and application scenarios of Delta Lake.

Core capabilities of Delta Lake

Delta Lake is a unified data management system that brings data reliability and rapid analysis to data lakes in the cloud. Delta Lake runs on top of existing data lakes and is fully compatible with Apache Spark’s APIS. With Delta Lake, you can speed up the import of high-quality data into the data Lake, and teams can quickly use that data on cloud services, safely and scalable.

  • ACID transactionality: Delta Lake provides ACID transactionality between multiple write operations. Each write is a Transaction, and the writes recorded in the Transaction Log have a sequential sequence. Transaction logs track writes at the file level and use optimistic locking for concurrency control, which is ideal for data lakes because multiple writes trying to modify the same file don’t happen very often. When a conflict occurs, Delta Lake throws a concurrent modification exception for the user to process and retry his job. Delta Lake also provides the highest level of isolation (serializable isolation), allowing engineers to continuously write data to a directory or table, and consumers to continuously read data from the same directory or table, seeing the latest snapshot of the data when reading the data.
  • Schema Management: Delta Lake automatically verifies that the Schema of the DataFrame being written is compatible with the Schema of the table. A column that exists in the table but does not exist in the DataFrame is set to NULL. This operation will throw an exception if there are additional columns in the DataFrame that are not in the table. Delta Lake has DDL (Data Definition Language) capabilities to explicitly add new columns and automatically update schemas.
  • Scalable Metadata processing: Delta Lake stores Metadata information for a table or directory in a Transaction Log instead of Metadata Metastore. This allows Delta Lake to list files in large directories in a fixed amount of time and is very efficient when reading data.
  • Data versioning and Time Travel: Delta Lake allows users to read a snapshot of the historical version of a table or catalog. When a file is modified during the write process, Delta Lake creates a new version of the file and retains the old version. When users want to read an older version of a table or directory, they can provide a timestamp or version number to Apach Spark’s Read API, and Delta Lake builds a full snapshot of that timestamp or version based on information in the Transaction Log. This makes it very convenient for users to reproduce experiments and reports and, if necessary, restore the tables to older versions.
  • Unify batch streams: In addition to batch writes, Delta Lake can also be used as an efficient Streaming Sink for Apache Spark’s structured streams. Combined with ACID transactions and scalable metadata processing, the efficient Streaming Sink supports a large number of near real-time analysis use cases without the need to maintain complex stream and batch pipelines.
  • Record update and Delete: Delta Lake will support DML (Data Management Language) commands for merge, update, and delete. This allows engineers to easily insert and delete records in the data lake and simplify their change data capture and GDPR (General Data Protection Regulation) use cases. Because Delta Lake tracks and modifies data at file-level granularity, it is much more efficient than reading and overwriting an entire partition or table.

Data lake construction and management

1. Data into the lake

The original data of an enterprise exists in multiple databases or storage systems, such as the relational database MySQL, log system SLS, NoSQL storage HBase, and message database Kafka. Most of these online stores are for online transactional businesses and are not suitable for online analysis scenarios, so data needs to be synchronized in a non-intrusive way to object storage that is cheaper and more suitable for computational analysis.

The common data synchronization methods include batch synchronization based on DataX, Sqoop and other data synchronization tools. At the same time, Kafka+ Spark Streaming/Flink and other Streaming synchronous links are used in real-time scenarios. At present, many cloud manufacturers provide one-stop solutions to help customers achieve data entry into the lake in a faster and cheaper way, such as Ali Cloud DLF data entry into the lake.

2. Unify metadata services

Object storage itself does not have big data analysis semantics. It needs to combine metadata services such as Hive Metastore Service to provide Meta information for upper-layer analysis engines. Data lake metadata service design goal is to be able to in the big data engine, storage, the diversity of environment, build different storage systems, and unified view metadata format and different calculation engine, and with the unified permissions, metadata, and needs to be compatible and extend open source data ecological metadata service, support automatic access to metadata, and achieve the purpose of a management used multiple times, This is both compatible with the open source ecosystem and has great ease of use.

Data lake calculation and analysis

Compared with data warehouse, data lake connects to different computing engines in a more open way, such as Hive, Spark, Presto, Flink, and other traditional open source big data computing engines. It also supports cloud vendors’ own big data engines, such as Aliyun MaxCompute and Hologres. Between the data lake storage and the computing engine, data lake acceleration is also typically provided to improve computational analysis performance while reducing bandwidth costs and pressures.

Databricks Data Insight – The commercial version of Spark’s data computing and analysis engine

DataBricks Data Insight (DDI) is the fully hosted Spark analysis engine on Aliyun, which enables users to quickly and easily calculate and analyze data in the data lake.

  • ** fully hosted Saas Spark: ** No o&M, no need to pay attention to the underlying resources, reduce o&M costs, and focus on analysis services
  • ** Complete Spark technology stack integration: ** One-stop integration of Spark engine and Delta Lake data Lake, 100% compatible with open source Spark Community edition; Databricks provides commercial support for the latest Spark features
  • ** Overall cost reduction: ** Significant performance advantages of commercial Spark and Delta Lake; At the same time, based on the computing and storage separation architecture, the storage relies on Aliyun OSS object storage and is accelerated by Aliyun JindoFS cache layer. It can effectively reduce the total usage cost of the cluster
  • ** High quality support and SLA assurance: ** Aliyun and Databricks provide Spark full stack technical support; Provide commercial SLA assurance and 7*24 hour Databricks expert support services

Databricks Data Insight + DLF data lake construction and stream batch analysis practices

The construction and application of data lake generally need to go through several processes, such as data entering the lake, data lake storage and management, data lake exploration and analysis. This paper mainly introduces the one-stop data into the lake based on Ali Cloud Data Lake Construction (DLF) +Databricks Data Insight (DDI), and the actual data analysis of batch flow.

Stream processing scenario:

Real-time scenario maintenance updates two Delta tables:

  • Delta_aggregates_func Table: RDS data enters the lake in real time.
  • Delta_aggregates_metrics table: Industrial metric data are collected from Cloud Kafka through IoT platform and streamed into the lake in real time through Spark Structured Streaming.

Batch scenario:

Generate two deltas in real-time scenarios as data sources, analyze the data, execute Spark Jobs, and gain insight into the scheduled execution of jobs through Databrick data.

precondition

1. Service opening

Ensure that cloud products and services such as DLF, OSS, Kafka, DDI, RDS, and DTS have been enabled. The DLF, RDS, Kafka, and DDI instances must be in the same Region.

2. Obtain RDS data

RDS Data preparation: Create database DLFDB in RDS. Create a user account in the account center that can read the Engine_funcs database, such as DLF_admin.

Log in to the database through the DMS, run the statement to create the engine_funcs table, and insert a small amount of data.

CREATE TABLE `engine_funcs` (  `emp_no` int(11) NOT NULL,
  `engine_serial_number` varchar(20) NOT NULL,
  `engine_serial_name` varchar(20) NOT NULL,
  `target_engine_serial_number` varchar(20) NOT NULL,
  `target_engine_serial_name` varchar(20) NOT NULL,
  `operator` varchar(16) NOT NULL,
  `create_time` DATETIME NOT NULL,
  `update_time` DATETIME NOT NULL,
  PRIMARY KEY (`emp_no`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

INSERT INTO `engine_funcs` VALUES (10001,'1107108133','temperature','1107108144','temperature','/', now(), now());
INSERT INTO `engine_funcs` VALUES (10002,'1107108155','temperature','1107108133','temperature','/', now(), now());
INSERT INTO `engine_funcs` VALUES (10003,'1107108155','runTime','1107108166','speed','/', now(), now());
INSERT INTO `engine_funcs` VALUES (10004,'1107108177','pressure','1107108155','electricity','/', now(), now());
INSERT INTO `engine_funcs` VALUES (10005,'1107108188','flow' ,'1107108111','runTime','/', now(), now());
Copy the code

RDS data entered the lake in real time

1. Create a data source

  • Enter the DLF console interface: dlf.console.aliyun.com/cn-hangzhou… Data into the Lake -> Data source Management.
  • Click New Data source. Enter the connection name, select the RDS instance used in data preparation, enter the account password, and click Connection Test to verify network connectivity and account availability.

  • Click Next, ok, and the data source is created.

2. Create the metadata database

Create a new Bucket in OSS, databricks-data-source;

Click the left menu “Metadata Management” -> “Metadata”, click “New Metadata”. Enter a name, create a directory DLF /, and select.

3. Create a task to enter the lake

  • Click the menu “Data into the Lake” -> “Into the Lake Task Management”, click “New into the Lake Task”.
  • Select Real-time entry of relational database and fill in data source, target data lake, task configuration and other information according to the following figure. And save.
  • To configure the data source, select the newly created “DLF” connection, use the table path “DLF /engine_funcs”, select the new DTS subscription, and fill in the name.

  • Go back to the Task Management page and click “Run” for the new entry task. You will see the task enter the “Initializing” state, followed by the “Running” state.
  • Click “Details” to enter the task details page, and you can see the corresponding database table information.

The data entering the lake is a full + incremental entry task. After about 3 to 5 minutes, the full data will be imported, and then it will automatically enter the real-time monitoring state. If there are data updates, they are automatically updated to the Delta Lake data.

Data lake exploration and analysis

DLF data query exploration

DLF products provide lightweight data preview and exploration functions. Click the menu “Data Exploration” -> “SQL Query” to enter the data query page.

In the metadata table, find “fjl_DLf” and expand to see that the engine_funcs_delta table has been created automatically. Double-click the table name, and the SQL statement for querying the table will appear in the SQL edit box on the right. Click “Run” to obtain the data query result.

Back to the DMS console, run the DELETE and INSERT SQL statements below.

DELETE FROM `engine_funcs` where `emp_no` = 10001;
UPDATE `engine_funcs` SET `operator` = '+', `update_time` = NOW() WHERE `emp_no` =10002;
INSERT INTO `engine_funcs` VALUES (20001,'1107108199','speed','1107108122','runTime','*', now(), now());
Copy the code

About 1 to 3 minutes later, the SELECT statement is executed again in DLF Data Exploration and all data updates are synchronized to the data lake.

Create a Databricks Data Insight (DDI) cluster

  • After the cluster is created, click Details to go to the details page and add the IP address whitelist of the currently accessing machine.

  • Click The Notebook to enter the interactive analysis page. The query is synchronized to the engine_FUNCs_delta table data in the Delta Lake.

The IoT platform collects cloud Kafka data and writes it to Delta Lake in real time

1. Introduce the spark-SQL-Kafka three-party dependency

% spark. The conf spark. Jars. Packages org. Apache. The spark that the spark - SQL - kafka - 0-10 _2. 12:3. 0.1Copy the code

2. Use the UDF function to define the Merge rule for writing stream data to the Delta Lake

Format of test data sent to Kafka:

{"sn": "1107108111","temperature": "12" ,"speed":"1115", "runTime":"160","pressure":"210","electricity":"380","flow":"740","dia":"330"} {"sn": "1107108122","temperature": "13" ,"speed":"1015", "runTime":"150","pressure":"220","electricity":"390","flow":"787","dia":"340"} {"sn": "1107108133","temperature": "14" ,"speed":"1215", "runTime":"140","pressure":"230","electricity":"377","flow":"777","dia":"345"} {"sn": "1107108144","temperature": "15" ,"speed":"1315", "runTime":"145","pressure":"240","electricity":"367","flow":"730","dia":"430"} {"sn": "1107108155","temperature": "16" ,"speed":"1415", "runTime":"155","pressure":"250","electricity":"383","flow":"750","dia":"345"} {"sn": "1107108166","temperature": "10" ,"speed":"1515", "runTime":"145","pressure":"260","electricity":"350","flow":"734","dia":"365"} {"sn": "1107108177","temperature": "12" ,"speed":"1115", "runTime":"160","pressure":"210","electricity":"377","flow":"733","dia":"330"} {"sn": "1107108188","temperature": "13" ,"speed":"1015", "runTime":"150","pressure":"220","electricity":"381","flow":"737","dia":"340"} {"sn": "1107108199","temperature": "14" ,"speed":"1215", "runTime":"140","pressure":"230","electricity":"378","flow":"747","dia":"345"} %spark import org.apache.spark.sql._ import io.delta.tables._ def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) {microBatchOutputDF. CreateOrReplaceTempView (" dataStream ") / / DF executive convection data column turned operation; val df=microBatchOutputDF.sparkSession.sql(s""" select `sn`, stack(7, 'temperature', `temperature`, 'speed', `speed`, 'runTime', `runTime`, 'pressure', `pressure`, 'electricity', `electricity`, 'flow', `flow` , 'dia', `dia`) as (`name`, ` value `) from dataStream "" "). Df createOrReplaceTempView (" updates") / / real-time dynamic data updating, SQL (s""" merge INTO delta_aggregates_metrics t USING updates s ON s.metrics = t.metrics val mergedf= df.sparksession. SQL (s""" merge INTO delta_aggregates_metrics t USING updates s ON s.metrics = t.metrics and s.name=t.name WHEN MATCHED THEN UPDATE SET t.value = s.value, t.update_time=current_timestamp() WHEN NOT MATCHED THEN INSERT (t.sn,t.name,t.value ,t.create_time,t.update_time) values  (s.sn,s.name,s.value,current_timestamp(),current_timestamp()) """) }Copy the code

3. Use Spark Structured Streaming to write real-time streams to Delta Lake

%spark import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.Trigger def getquery(checkpoint_dir:String,servers:String,topic:String ){ var streamingInputDF = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", servers) .option("subscribe", topic) .option("startingOffsets", "latest") .option("minPartitions", "10") .option("failOnDataLoss", "true") .load() var streamingSelectDF = streamingInputDF .select( get_json_object(($"value").cast("string"), "$.sn").alias("sn"), get_json_object(($"value").cast("string"), "$.temperature").alias("temperature"), get_json_object(($"value").cast("string"), "$.speed").alias("speed"), get_json_object(($"value").cast("string"), "$.runTime").alias("runTime"), get_json_object(($"value").cast("string"), "$.electricity").alias("electricity"), get_json_object(($"value").cast("string"), "$.flow").alias("flow"), get_json_object(($"value").cast("string"), "$.dia").alias("dia"), get_json_object(($"value").cast("string"), "$.pressure").alias("pressure") ) val query = streamingSelectDF .writeStream .format("delta") .option("checkpointLocation", Checkpoint_dir).trigger(trigger.processingTime ("5 seconds")).foreachBatch(upsertToDelta _) // Reference upsertToDelta function.outputMode("update").start()}Copy the code

4. Execute the program

%spark val my_checkpoint_dir="oss://databricks-data-source/checkpoint/ck" val servers= "***.***.***.***:9092" val topic=  "your-topic" getquery(my_checkpoint_dir,servers,topic)Copy the code

5. Start Kafka and send test data to production

  • Query data is written and updated in real time

  • Query the engine_funcs_delta data from MySQL in real time with entering the lake

    %spark val rds_dataV=spark.table(“fjl_dlf.engine_funcs_delta”) rds_dataV.show()

Batch job

In combination with the business, it is necessary to join the corresponding Value parameter in delta_aggregates_metrics to the engine_funcs_delta table

%spark // Read the updated delta_aggregates_metrics table val AggregateDF =spark.table("log_data_warehouse_dlf.delta_aggregates_metrics") // Read the updated engine_funcs_delta function val rds_dataV=spark.table("fjl_dlf.engine_funcs_delta").drop("create_time","update_time") // rds_dataV.show() val aggregateSDF= aggregateDF.withColumnRenamed("value","esn_value").withColumnRenamed("name","engine_serial_name").withColumnRenamed("sn" ,"engine_serial_number") // aggregateSDF.show() val aggregateTDF=aggregateDF.withColumnRenamed("value","tesn_value").withColumnRenamed("name","target_engine_serial_name").w ithColumnRenamed("sn","target_engine_serial_number").drop("create_time","update_time") // aggregateTDF.show() // Add Value from delta_aggregates_metrics to engine_funcs_delta; val resdf=rds_dataV.join(aggregateSDF,Seq("engine_serial_name","engine_serial_number"),"left").join(aggregateTDF,Seq("target _engine_serial_number","target_engine_serial_name"),"left") .selectExpr("engine_serial_number","engine_serial_name","esn_value","target_engine_serial_number","target_engine_serial_ Name ","tesn_value","operator","create_time","update_time") resdf.show(false resdf.write.format("delta") .mode("append") .saveAsTable("log_data_warehouse_dlf.delta_result")Copy the code

OPTIMIZE & Z-ordering

In stream processing scenarios, a large number of small files will be generated, which will seriously affect the read performance of the data system. Delta Lake provides the OPTIMIZE command to compress small files and the Z-ordering mechanism for ad-hoc queries involving multiple dimensions of a single table. It can effectively improve query performance. This greatly improves the performance of reading tables. DeltaLake itself provides the Auto Optimize option, but it increases the latency of writing data to the Delta table at the expense of less write performance. Conversely, execution of the OPTIMIZE command does not affect write performance because Delta Lake itself supports MVCC and enables simultaneous write execution with OPTIMIZE. So, we use a solution that triggers the OPTIMIZE periodically, merges small files hourly, and executes VACCUM to clean up expired data files:

OPTIMIZE log_data_warehouse_dlf.delta_result ZORDER by engine_serial_number;

VACUUM log_data_warehouse_dlf.delta_result RETAIN 1 HOURS;
Copy the code

The original link

This article is ali Cloud original content, shall not be reproduced without permission.