Introduction: to obtain more detailed Databricks data insight into relevant information, to the product details page to view: https://www.aliyun.com/produc…
The author is Intelligent Technology Department, Advanced Research Center, HVAC and Building Division, Midea
Midea HVAC IOT data platform construction background
Midea HVAC and Building Business Unit (hereinafter referred to as Midea HVAC) is one of the five divisions of Midea Group. Its products cover multi-unit units, large chiller units, unit units, room air conditioners, escalators, straight escalators, freight elevators, building automation software and weak current integrated solutions, which are exported to more than 200 countries at home and abroad. At present, the cloud of equipment data in the Business Unit only stays at the level of data storage, and there is a lack of platform to mine the value of data, which leads to a large amount of data waste, continuous consumption of storage resources, and increased storage costs and maintenance costs. On the other hand, existing data-driven applications lack a deployment platform to generate real value. Therefore, there is an urgent need for a common IoT data platform to support rapid analysis and modeling of device operational data.
Our IoT data platform construction is based on AliCloud Databricks data insight into the fully hosted Spark product. The following is the overall business architecture diagram. In the later chapters of this article, we will share some thoughts on the selection of technologies for IoT data platform construction, as well as the application of the Spark technology stack, especially the Delta Lake scenario.
Select Spark & Delta Lake
On the data platform computing engine layer technology selection, due to our data team has just established recently, preparing the architecture of the selection we have done a lot of research, comprehensive consider all aspects, want to choose a mature and a unified platform: to support both data processing, data analysis scenarios, can also support data science well. This, coupled with the team’s extensive experience with Python and Spark, led to a focus on the Spark technology stack from the start.
Select Databricks for data insight into Delta Lake
After multiple technical exchanges and practical proof of concept with the Alibaba Cloud Computing Platform team, we finally chose the Alibaba Cloud Databricks data insight product. As the parent company of the Spark engine, its commercial Spark engine, fully hosted Spark technology stack, unified data engineering and data science were important reasons behind our decision to choose Databricks Data Insights.
Specifically, Databricks data insights provide the following core benefits:
• Full Spark technology stack integration: One-stop integration of Spark engine and Delta Lake data Lake, 100% compatible with open source Spark Community Edition; • Total cost reduction: Commercial Spark and Delta Lake have significant performance advantages; At the same time, based on the computing storage separation architecture, the storage relies on Aliyun OSS object storage and accelerates with Aliyun Jindofs cache layer. • High quality support and SLA guarantee: AliCloud and Databricks provide technical support covering Spark’s full stack; Provides commercial SLA support with Databricks expert support 7*24 hours a day
Overall architecture of IoT data platform
The overall architecture is shown in the figure above.
The IoT data we access is divided into two parts, historical stock data and real-time data. Currently, historical inventory data is imported in batches into Delta Lake tables from different customer relational databases on a daily basis using Spark SQL; Real-time data were collected into cloud Kafka via IoT platform and then consumed and written into Delta Lake table in real time via Spark Structured Streaming. In this process, we sink both the real-time data and the historical data into the same Delta table, which simplifies our ETL process significantly (see the example section below). Downstream of the data pipeline, we connect data analysis and data science workflow.
IoT Data Acquisition: From Little Data to Big Data
As a typical application of IoT scenarios, the core data of Midea HVAC are all from IoT terminal devices. In the entire IoT environment, there are countless terminal sensors. From a Small dimension, the Data generated by the sensor itself belongs to Small Data (or Little Data). When all the sensors are connected into a large IoT network, the Data generated from the different sensors are connected to the cloud through the Gateway and eventually form Big Data in the cloud.
In our scenario, the IoT platform itself will perform preliminary parsing of the data for different protocols, and the semi-structured JSON data will be sent over the network to cloud Kafka via custom hardware network devices. Cloud Kafka acts as the gateway to the entire data pipeline.
Data into Lake: Delta Lake
Data in IoT scenarios has the following characteristics:
• Time-series data: the data records generated by the sensor contain time-related information. The data itself has a time attribute, so there may be some correlation between different data. Joining different time series data together by as-of-join is the basis of downstream data prediction and analysis • Real-time data: sensor generates data in real time and transmits it to the data pipeline with the lowest delay, triggering the rule engine, generating alarms and events, and notifies relevant staff. • Huge data volume: thousands of devices and their sensors all over the world in the IoT network environment collect massive data to the platform through access services • Diverse data protocols: Generally, different types of devices connected to the IoT platform have different types of data upload protocols and different data encoding formats
The above features of IoT data present a number of challenges in data processing, data analysis, and data science, which are, thankfully, well addressed with Spark and Delta Lake. Delta Lake provides ACID transaction guarantees that support incremental update of tables and simultaneous write of data to stream batches. Real-time entry of IOT time series data into the lake can be realized by Spark Structed Streaming.
Here is the classic three-level datasheet architecture for Delta Lake. Specific to the HVAC IoT data scene in the United States, we have made the following definitions for each level of data table:
• Bronze table: Raw Data is stored, and the Data is consumed via Spark Structed Streaming from Kafka and upsert into the Delta Lake table. This table is the only Single Source of Truth • Silver table: This table is an intermediate table based on the Bronze table data processed in the Midea HVA scenario, which involves complex timing data calculation logic. This logic is packaged in Pandas UDF for Spark calculation using the Gold table: The data from the SILVER table is Schema constrained and further cleaned into the GOLD table, which is used for downstream Ad Hoc query analysis and data science
Data analysis: Ad-hoc queries
Internally, we have customized an internal version of the SQL query and data visualization platform based on the open source SuperSet, connected to DataBricks data insight Spark Thrift Server service via PyHive, and can commit SQL to a cluster. The commercial version of Thrift Server has been enhanced in terms of both availability and performance, and Databricks Data Insight provides an LDAP-based implementation of user authentication for JDBC connection security authentication. With SuperSet, data analysts and data scientists can quickly and efficiently explore the data in Delta Lake tables.
Data Science: Workspace
Building energy consumption prediction and equipment fault diagnosis prediction are the two main business objectives of the construction of HVAC IoT big data platform. Downstream of the IoT data pipeline, machine learning platforms need to be connected. In order to support data science scenarios more quickly and easily, we connected the Databricks data insight cluster with DDC, the Aliyun data development platform. DDC integrates the Jupyter Notebook, which is more user-friendly in data science scenarios. By using PySpark on Jupyter, jobs can be run to the Databricks data insight cluster. It is also possible to schedule jobs with Apache Airflow. At the same time, considering the basic aspects of machine learning model building, iterative training, metric detection, deployment and so on, we are also exploring MLOPS, which is still in preparation.
Typical application scenarios are introduced
Delta Lake data into the Lake (batch flow integrated)
Use UDF functions to define the Merge rules for writing stream data to Delta Lake
%spark import org.apache.spark.sql._ import io.delta.tables._ // Function to upsert `microBatchOutputDF` into Delta table using MERGE def upsertToDelta(microBatchOutputDF: DataFrame, batchId: Long) { // Set the dataframe to view name microBatchOutputDF.createOrReplaceTempView("updates") // Use the view name to apply MERGE // NOTE: You have to use the SparkSession that has been used to define the `updates` dataframe microBatchOutputDF.sparkSession.sql(s""" MERGE INTO delta_{table_name} t USING updates s ON s.uuid = t.uuid WHEN MATCHED THEN UPDATE SET t.device_id = s.device_id, t.indoor_temperature = s.indoor_temperature, t.ouoor_temperature = s.ouoor_temperature, t.chiller_temperature = s.chiller_temperature, t.electricity = s.electricity, t.protocal_version = s.protocal_version, t.dt=s.dt, t.update_time=current_timestamp() WHEN NOT MATCHED THEN INSERT (t.uuid,t.device_id,t.indoor_temperature,t.ouoor_temperature ,t.chiller_temperature ,t.electricity,t.protocal_version,t.dt,t.create_time,t.update_time) values (s.uuid,s.device_id,s.indoor_temperature,s.ouoor_temperature,s.chiller_temperature,s.electricity,s.protocal_version ,s.dt,current_timestamp(),current_timestamp()) """) }
Write Delta Lake using Spark Structured Streaming live streams
%spark import org.apache.spark.sql.functions._ import org.apache.spark.sql.streaming.Trigger def getquery(checkpoint_dir:String,tableName:String,servers:String,topic:String ) { var streamingInputDF = spark.readStream .format("kafka") .option("kafka.bootstrap.servers", servers) .option("subscribe", topic) .option("startingOffsets", "latest") .option("minPartitions", "10") .option("failOnDataLoss", "true") .load() val resDF=streamingInputDF .select(col("value").cast("string")) .withColumn("newMessage",split(col("value"), " ")) .filter(col("newMessage").getItem(7).isNotNull) .select( col("newMessage").getItem(0).as("uuid"), col("newMessage").getItem(1).as("device_id"), col("newMessage").getItem(2).as("indoor_temperature"), col("newMessage").getItem(3).as("ouoor_temperature"), col("newMessage").getItem(4).as("chiller_temperature"), col("newMessage").getItem(5).as("electricity"), col("newMessage").getItem(6).as("protocal_version") ) .withColumn("dt",date_format(current_date(),"yyyyMMdd")) val query = resDF .writeStream .format("delta") .option("checkpointLocation", Checkpoint_dir).trigger(trigger.processingTime ("60 seconds")) // foreachBatch(upsertToDelta _) OutputMode ("update") query.start()}
Data Recovery: Deep Clone
Since the data of Delta Lake is only connected to real-time data, we used SparkSQL to Sink the inventory history data into the table of Delta Lake once, so we only maintained one Delta table during stream and batch processing. So we only Merge these two pieces of data once, initially. At the same time, to ensure the high security of the data, we use Databricks Deep Clone for data disaster preparation, which is updated regularly every day to maintain a slave table for backup. For daily data addition, using Deep Clone also only inserts the new data and updates the data that need to be updated, which can greatly improve performance.
CREATE OR REPLACE TABLE delta.delta_{table_name}_clone
DEEP CLONE delta.delta_{table_name};
Performance Optimization: Optimize & Z Ordering
A large number of small files will be generated in the scenario of stream processing. The existence of a large number of small files will seriously affect the read performance of the data system. Delta Lake provides an OPTIMIZE command that can combine and compress small files. In an ad-hoc query scenario, Delta Lake provides a Z-ordering mechanism for Ordering data from a single table. Can effectively improve the performance of the query. This greatly improves the performance of the read table. Deltalake itself provides the Auto Optimize option, but it will sacrifice a small amount of write performance and increase the delay in writing data to the delta table. In contrast, executing the OPTIMIZE command does not affect write performance, because Delta Lake itself supports MVCC and allows for concurrent writing of OPTIMIZE. Therefore, we used an OPTIMIZE solution that triggers execution on a regular basis, merging small files with OPTIMIZE every hour and cleaning up stale data files with Vaccum:
OPTIMIZE delta.delta_{table_name} ZORDER by device_id, indoor_temperature;
set spark.databricks.delta.retentionDurationCheck.enabled = false;
VACUUM delta.delta_{table_name} RETAIN 1 HOURS;
In addition, for the ad-hoc query scenario, since it involves the query of multi-dimensional data in a single table, the Z-ordering mechanism provided by Delta Lake can be used to effectively improve the query performance.
Summary and Prospect
We quickly built the IoT data processing platform based on the commercial Spark and Delta Lake technology stack provided by AliCloud DataBricks Data Insight product. DataBricks Data Insight provides full hosting, operation free, performance advantages of the commercial version engine, and a computing/storage separation architecture. It saves us total cost. At the same time, the rich features provided by DataBricks data insights themselves have greatly increased the productivity of our data teams and laid the foundation for rapid delivery of our data analytics business. In the future, Midea expects to work with AliCloud Databricks Data Insight Team to output more industry-advanced solutions for IoT scenarios.
This article is the original content of Aliyun, shall not be reproduced without permission.