Abstract: CarbonData serves as an intermediary between Apache Spark and storage systems, providing four important functions for Spark.
This article is shared by huawei cloud community “Make Apache Spark better withCarbonData”, originally written by: big data practitioner.
Spark is certainly a powerful processing engine and a distributed cluster computing framework for faster processing. Unfortunately, Spark also falls short in some areas. If we use Apache Park in conjunction with Apache BonData, it can overcome these shortcomings:
1. ACIDtransaction is not supported
2. 没有 qualityenforcement
3. Small file problems
Inefficient dataskipping
What is ACID?
The Spark and ACID
ATOMICITY
The A in ACID stands for atomicity. Basically, this means all or nothing. Therefore, when you use the Spark Dataframe Writer API, it should write complete data or none at all. Let’s take a quick look at the Spark documentation. According to the Spark documentation: “It is important torealize that these save modes (overwrite) do not utilize any locking and are notatomic. Additionally, When performing an Overwrite, the data will be deletedbefore writing out the new data.”
While the whole situation seems a little scary, it’s actually not that bad. The Sparkdataframe API performs job-level commits internally, which helps achieve a degree of atomicity that works with the “Append” mode of FileOutputCommitter using Hadoop. However, the default implementation introduces performance overhead, especially if cloud storage [S3/OBS] is used instead of HDFS.
We can now run the following code to prove that Sparkoverwrite is not atomic, which can cause data corruption or data loss. The first part of the code mimics Job 1, which creates 100 records and saves them to the ACIDpath directory. The second part of the code mimics Job 2, which attempts to overwrite existing data but throws an exception during operation. The result of both efforts is data loss. Finally, we lost the data created for the first job.
Because of the exception, the job-level commit will not occur, so the new file will not be saved. Because Spark deleted old files, we lost existing data. The Spark Dataframe Writer API is not atomic, but it behaves like an atomic operation for an append operation.
CONSISTENCY
Distributed systems are usually built on machines with low availability. Consistency is a key problem in high availability systems. If all nodes see and return the same data at the same time, the system is consistent. There are several consistency models, and the most common ones in distributed systems are strong consistency, weak consistency, and final consistency. We learned that the Sparkwriter API’s overwrite mode deletes old files and then places new ones. Therefore, between these two states, there is a period of time when no data is available. If our work fails, we will lose data. This means that there are no smooth transactions between the two operations. This is a typical atomicity problem for Spark override operations. This problem also undermines the consistency of the data. Spark APIS lack consistency. Therefore, Spark write mode does not support consistency.
Isolation and Durability in Spark
Isolation means separation. Separate from any other concurrent operation. Suppose we are writing to an uncommitted data set, and another concurrent process is reading/writing to the same data set. According to the isolation characteristics, others should not be affected in this case. Typical databases provide different isolation levels, such as committed reads and serializable. Although Spark has task-level and job-level commits, Spark cannot provide proper isolation due to the lack of atomicity of write operations.
Finally, They are committed state/data that the system saves so that data can be used in the correct state even in the event of failures and system restarts. Persistence is provided by the storage tier, and in the case of Spark applications, it is a function of HDFS and S3/OBS. However, when Spark doesn’t provide proper commit due to lack of atomicity, we can’t count on persistence without proper commit.
If we look closely, all of these ACID properties are correlated. Because of the lack of atomicity, we lose consistency and isolation, and because of the lack of isolation, we lose persistence.
Lack of Schema Enforcement
We know that Spark means Schema when it reads. So, when we write any data, it doesn’t throw an exception if any of the patterns don’t match. Let’s try to understand this with an example. Let’s have an input array that contains the following records. The following program reads the CSV and converts it to DF
The program reads from a CSV file, writes back in parquet format and displays the data. The output is as follows
Let’s read another input CSV file where the “Cost” column has decimal values instead of integers (as shown below) and append to the above file
In this case, our program will read the CSV and write to the Parquet format without exception. Our program will throw an error when we want to display/display a data frame
This is because Spark never validates schemas during write operations. The pattern of the “Cost” column is inferred to be an integer during the first load, and it appends double data without problem during the second write. When we read the additional data and invoke the operation, it raises an error due to schema incompatibility.
How to overcome the above drawbacks ofSpark
The above can be managed if we plug in CarbonData as an additional layer to our storage solution using ApacheSpark.
What is CarbonData
Because the Hadoop Distributed File system (HDFS) and object storage are similar to file systems, they are not designed to provide transaction support. Implementing transactions in a distributed processing environment is a challenging problem. For example, implementations often must consider locking access to storage systems at the expense of overall throughput performance. Storage solutions such as ApacheCarbonData effectively address these ACID requirements for data lakes by pushing these transaction semantics and rules into the file format itself or into a combination of metadata and file formats. CarbonData acts as an intermediary service between Apache Spark and storage systems. Responsibility for ACID compliance now rests with CarbonData. The underlying Storage system can be anything from HDFS, Huawei OBS, Amazon S3, or Azure Blob Storage.
Some of the key features CarbonData provides for Spark are:
1. ACID transactions.
2. Schema enforcement/Schema validation.
3. Enables Updates, Deletes and Merge.
4. Automatic data indexing.
CarbonData in Apache Spark: ACID
In the code snippet above, the first part of the code mimics Job-1, creating 100 records and saving them to the ACIDpath directory. The second part of the code mimics Job-2, which tries to overwrite existing data but throws an exception during operation.
The result of both efforts is data loss. Finally, we lost the data created for the first job. Now let’s change the code shown below to use CarbonData.
Execute the first job and count the number of rows. As expected, you get 100 lines.
If you examine the data directory, you will see a Snappycompressed CarbonData file. The data file holds 100 rows in column encoding format. You’ll also see a metadata directory containing the TableStatus file. Now do the second job. What are you looking for in your second job? As mentioned earlier, this effort should try to do the following.
1. Delete the previous file.
2. Create a new file and start writing records.
3. Throw a runtime exception in the middle of the job.
Due to an exception, the job level commit did not occur and we lost the existing data observed above without CarbonData.
But now if you execute the second job, you still get an exception. Then, count the number of rows. You get output of 100, and no old records are lost. It looks like CarbonData has atomized Overwrite. Let’s take a look at the data directory and you’ll find two CarbonData files.
One file is created by the first job, and the other is created by job 2. Instead of deleting the old file, Job 2 creates a new file and starts writing data to it. This method leaves the old data state unchanged. That’s why we don’t lose old data, because old files stay the same. The new incomplete file is also there, but the data in the new incomplete file is not read. This logic is hidden in the metadata directory and managed using the TableStatus file. The second job could not create a successful entry in the TableStatus file because it failed in the middle. The read API does not read files in the TableStatus file whose entries are marked for deletion.
This time, let’s write the code logic, without exception, to cover the old 100 with 50 records.
Now the record count shows 50. As expected.So, you have overwritten the older data set of 100 rows with a new data set of50 rows.
CarbonData addresses data consistency issues by bringing metadata management to Apache Park and atomizing the Spark data writer API. CarbonData will be able to provide update and delete functionality once the consistency issues are resolved.
Spark With CarbonData: Schema Enforcement
Let’s consider a simple user scenario where data arrives in batches for transformation. For the sake of simplicity here, let’s assume that there are only two batches of data, and that the second batch carries some column data of a different type than the first.
To start the experiment, let’s read from Table 1 and write data with and without CarbonData. We were able to use the “Overwrite” mode to write data with and without CarbonData.
Now let’s read the second table with bi-typed data of the cost column type and then write the data frame to Parquet and CarbonTables (note: _c2 is an integer type and we are trying to attach bi-typed data). There is no problem with using parquet to attach data that does not match the pattern, but when the program tries to attach the same data to a CarbonData table, it throws an error:
Therefore, based on the above experiments, we can see that CarbonData validates schemas before writing to the underlying storage, which means that CarbonData uses schema validation when writing to it. CarbonData will cancel the transaction if the types are incompatible. This will help track down problems at the beginning, rather than getting confused with good data and then trying to figure out the root cause.
English: brijoobopanna.medium.com/making-apac…
Author: Brijoobopanna
Click to follow, the first time to learn about Huawei cloud fresh technology ~