Spark SQL is one of the most important features in Apache Spark. In terms of SQL usage, Spark SQL is similar to other SQL engines for large-scale offline data, such as Presto/Apache Hive. In addition to SQL, Apache Spark provides a more flexible DataFrame API.

Spark SQL and Spark DataFrame apis are used in different scenarios.

  • The Spark DataFrame API is more expressive than Spark SQL. For engineers with good programming skills and code abstraction ability, DataFrame can be used to solve complex data processing problems more easily.

  • Spark SQL syntax is almost identical to Apache Hive SQL syntax. Spark SQL can meet most data processing requirements. Using the DataFrame API, we need to create a Scala project, compile the code, package it, and finally submit it to the cluster via Spark-Submit; Spark SQL, on the other hand, does not need to be built to generate intermediates and can be submitted to the cluster via JDBC, for example.

We need to customize Spark SQL

However, with Spark SQL, there are always extreme cases to deal with, such as:

“I have a table with 200 columns, and I want to eliminate two columns and save the remaining 198 columns in a new table.”

Using the Spark DataFrame API, this problem can be solved as follows:

In SQL, 198 column names are placed directly in the SQL statement, which is not readable and cannot handle the increment of columns:

Select COL_1, COL_2, COL_3, COL_4, COL_5, COL_6, COL_7, COL_8, col_9…

Spark SQL has a HiveQL-compliant solution to this problem that requires configuration items to be enabled to use the following poorly readable syntax:

set spark.sql.parser.quotedRegexColumnNames=true;

select `(col_to_exclude_1)? +.+` from tbl

If someone asks me how to exclude two fields, I’m afraid I can’t answer this question.

At Tubi, we solve this problem by customising Spark SQL syntax:

// select all columns excluding `col_to_exclude_1`

select all_columns_except(“tbl”, “col_to_exclude_1”)

// select all columns excluding `col_to_exclude_1` and `col_to_exclude_2`

select all_columns_except(“tbl”, “col_to_exclude_1”, “col_to_exclude_2”)

Macros can be used to avoid syntax in Spark SQL that is not easy to use. We can also use macros to enhance existing Spark SQL features, such as

select * from json.`s3://bucket/dir/dt=2020-12-12/`

The syntax above is a built-in feature of Spark SQL, but many times our data is not organized into directories by date, instead all files are in one directory:

s3://bucket/2020-12-12-13-00-00.json.gz

s3://bucket/2020-12-12-13-01-00.json.gz

s3://bucket/2020-12-12-13-02-00.json.gz

.

s3://bucket/2020-12-13-00-00-00.json.gz

s3://bucket/2020-12-13-00-00-01.json.gz

At this point, we want to access all data at 1pm on December 12, 2020 using the following syntax:

select * from json.`s3://bucket/dir/2020-12-12-13*`

Spark SQL does not support this syntax. You can use custom syntax to solve this problem.

But we don’t want to modify the Spark SQL source code and maintain a customized version of Spark on Tubi

To customize the syntax of Spark SQL, we took a very lightweight approach to solve this problem:

Tubi Spark SQL is the SQL used on Tubi that can be parsed by Spark SQL Parser.

In the first phase, all macros are expanded. In Tubi Spark SQL, macros are in the form of UDF syntax, which ensures that the Syntax of Tubi Spark SQL is parsed by Spark SQL Parser. It is worth noting that macros only borrow the syntax form of the UDF, and macro execution takes place in the first phase. After the macro is expanded, we will get a new Spark SQL statement. Udfs can accept columns as arguments, while macros can only accept constants. Typically, UDFs will be executed hundreds of millions of times, while macros will be executed only once in the first phase.

In the second stage, by analyzing the syntax tree, we can identify the various patterns of Spark SQL. For example, if it is drop database or DROP table, we can route this type of SQL to ForbiddenDropCommand. ForbiddenDropCommand does nothing but tell the user that this type of SQL cannot be executed in the SQL executor. For regular SparkSQL, it is routed to SparkSQLCommand, and for other types of SQL, we also provide some necessary customizations. For example, Delta Lake currently provides rich programming interfaces, but many programming interfaces do not have corresponding SQL syntax available. By identifying relevant patterns, we can implement the syntax that conforms to these SQL patterns through the rich programming interface in Delta.

Case study: all_columns_except

For the all_columns_except example, the actual execution is as follows:

// Suppose TBL has six columns: col1, COL2, col3, col4, col5, col6

select all_columns_except(“tbl”, “col1”, “col5”) from tbl

< span style = “max-width: 100%; clear: both; min-height: 1em

select col2, col3, col4, col6 from tbl

// –> Phase 2: route to SparkSQLCommand

Case study: tubi_json

For the json regular expression support example, the actual execution process is as follows:

select * from tubi_json.`s3://bucket/dir/2020-12-13*`

< span style = “max-width: 100%; clear: both; min-height: 1em;

select * from tubi_json.`s3://bucket/dir/2020-12-13*`

// –> Phase 2: route to TubiJSONCommand and execute related logic

1. Load data from ‘s3://bucket/dir/2020-12-13*’ and create temporary table ‘temp_vew’ for these data

2. Run spark. SQL (“select * from temp_view”)

In this way, we can customize Spark SQL with the following advantages:

  1. You do not need to modify the Spark source code to maintain the customized version of Spark.

  2. No custom Antlr 4 syntax file is required, and existing Spark SQL Parser and syntax definitions are reused.

  3. Implement and upgrade various custom syntax functions painlessly based on Spark.

The Parser code generated by Antlr 4 is very complex, and it is very difficult to maintain the code based on the generated code

The most difficult thing to do in this problem is to do macro replacement and SQL pattern recognition.

When it comes to macro replacement, understanding Antlr 4’s API for Rewriter can be a good solution.

For pattern recognition, we need to analyze the abstract syntax tree and route SQL that matches a particular pattern to a particular Command. Antlr 4 provides two modes for accessing an abstract syntax tree built from SQL: Listener and Visitor. The Spark Catalyst module uses the Visitor pattern. The Visitor pattern is fine for a large project like Apache Spark, but for us, both the Vistor pattern and the Listener pattern were too complex. For a one-time abstract tree analysis, we can also use Antlr supported XPath, which has similar drawbacks to regular expressions for text processing: it is difficult to parse and difficult to maintain.

The Scala standard library provides a number of easy-to-use operators, such as map/filter/count/find. The API design of Apache Spark is also influenced by the operators in the Scala standard library. Both low-level RDD and application-layer DataFrame implement these operators.

We can also design a similar operator for an abstract syntax tree built from SQL:

‘s3:// XXXX/YYYY *’; tubi_json. ‘s3:// XXXX/YYYY *’;

In the last Scala Meetup on this topic, we covered the design implementation and technical details of this simplification using four operational expressions. For those who are interested, please read part 3 of the 2020-09 Online Scala Meetup audience. A replay of the video can also be found on bitu’s official Bilibili account.

In addition, I will share some more details in the Big Data Architecture forum at Datafun year-end Conference, and welcome everyone to share with me:

The author: Shen Da @tubi

Edited by: Yang Yujia @tubi