In the second article | Spark Core programming guide, the Core module of the Spark. This article discusses another important Spark module, Spark SQL, which builds on Shark and was released in May 2014. According to the name, this module is a relational operation API provided by Spark and implements the functions of SQL-on-Spark. For those familiar with SQL, you can use SQL to perform complex data processing on Spark. Through this article, you can learn:

  • The Spark SQL profile
  • DataFrame API&DataSet API
  • Catalyst Optimizer
  • Spark SQL basic operations
  • Spark SQL data source
  • RDD is converted to DataFrame
  • Thrift Server and Spark SQL CLI

The Spark SQL profile

Spark SQL is a module of Spark used for structured data processing. Unlike the basic Spark RDD API, Spark SQL provides an interface that gives Spark more information about data structures and calculations being performed, and Spark SQL uses this additional information to perform additional optimizations. There are many ways to use SparkSQL, including SQL, DataFrame API, and Dataset API. It is important to note that the execution engine is the same regardless of how and in which language it is used. This unification means that developers can easily switch back and forth between different apis, making data processing more flexible.

DataFrame API&DataSet API

DataFrame API

DataFrame represents an immutable, distributed set of data. The core purpose of DataFrame is to let developers focus on what to do rather than how to do it, leaving optimization to the Spark framework itself. Dataframes have Schema information, which means they can be treated as data with field names and types, similar to tables in a relational database, but with a lot of refinement underneath. Once you have created the DataFrame, you can use SQL for data processing.

A DataFrame can be constructed from a variety of data sources, such as structured data files, tables in Hive, external databases, or existing RDD. The DataFrame API supports Scala, Java, Python, and R. In Scala and Java, a row DataSet stands for a DataFrame, that is, the DataSet [row] is the same as a DataFrame.

DataSet API

DataSet is a new interface added in Spark 1.6. It is an extension of DataFrame and has the advantages of RDD (strong typed input, support for powerful lambda functions) as well as the optimized execution engine of Spark SQL. You can build a DataSet from JVM objects and then use function conversions (map, flatMap, filter). It is worth noting that the Dataset API is available in Scala and Java, and Python does not support the Dataset API.

In addition, the DataSet API can reduce memory usage. Since the Spark framework knows the data structure of the DataSet, it can save a lot of memory space when persisting the DataSet.

Catalyst Optimizer

In Catalyst, there are two types of initiatives:

  • Logical Plan: Defines calculations on a data set, but does not yet define how to perform the calculations. Each logical plan defines a set of attributes (query fields) and constraints (WHERE conditions) that user code needs, but does not define how to execute them. The details are shown in the figure below:

  • Physical Plan: A Physical Plan is generated from a logical Plan, defines how calculations are performed, and is executable. For example, a JOIN in a logical plan is converted to a Sort Merge JOIN in a physical plan. Note that Spark generates multiple physical plans and selects the physical plan with the lowest cost. The details are shown in the figure below:

In Spark SQL, all operator operations are converted into an AST(Abstract Syntax Tree), which is then passed to the Catalyst optimizer. The optimizer is built on Scala’s functional programming foundation, and Catalyst supports both rule-based and cost-based optimization strategies.

The Query plan of Spark SQL consists of four phases (see the following figure) :


  • 1. The analysis
  • 2. Logic optimization
  • 3. Physical plan
  • 4. Generate code to compile the query part into Java bytecode

** Note: ** In the physical plan phase, Catalyst generates multiple plans, calculates the cost of each plan, and then compares the cost of those plans, a cost-based strategy. In other phases, it is a rules-based optimization strategy.

Analysis of the

Unresolved Logical plan –> Logical plan. The query plan for Spark SQL starts with the AST returned by the SQL parser, or DataFrame objects built by the API. In both cases, there will be an unprocessed attribute reference (a query field may not exist, or the data type may be wrong), such as the query statement SELECT COL FROM SALES, which won’t be clear until you look at the SALES table about the type of field COL, or whether the field is a valid field. When the type of an attribute field cannot be determined or does not match the input table, it is said to be untreated. Spark SQL uses Catalyst’s rules and a Catalog object (table information that can access the data source) to handle these properties. A Unresolved Logical Plan tree is first built, followed by a series of rules, and finally a Logical Plan is generated.

Logic optimization

Optimized Logical Plan –> Optimized Logical plan. The logical optimization phase uses rule-based optimization strategies, such as predicate push-down, projection clipping, and so on. A Optimized Logical Plan is generated after a series of optimizations.

Physical planning

Optimized Logical Plan –> Physical Plan. In the physical plan phase, Spark SQL generates multiple physical execution plans from the optimized logical plan. The Cost Model is used to calculate the Cost of each physical plan, and a physical plan is selected. At this stage, Spark SQL uses broadcast Join if it is determined that a table is small enough to persist to memory.

Note that the physical planner also uses rule-based optimization strategies, such as channeling projection and filtering operations into a Spark map operator. In addition, operations in the logical planning phase are pushed to the data source side (support for predicate push down, projection push down).

Code generation

The final stage of query optimization is to generate Java bytecode, which is done using Quasi Quotes.

After the above analysis, we have a preliminary understanding of Catalyst Optimizer. How do other components of Spark interact with Catalyst Optimizer? The details are shown in the figure below:


ML Pipelines, Structured Streaming and GraphFrames all use DataFrame/Dataset APIs and benefit from Catalyst Optimiser.

Quick Start

Create SparkSession

SparkSession is a programming entry for Dataset and DataFrame apis, supported from Spark2.0. It is used to unify the original HiveContext and SQLContext, leaving both entries for compatibility. Improved Spark’s ease of use through a SparkSession entry. The following code shows how to create a SparkSession:

import org.apache.spark.sql.SparkSession



val spark = SparkSession

  .builder()

  .appName("Spark SQL basic example")

  .config("spark.some.config.option"."some-value")

  .getOrCreate()

// Import implicit conversions, such as converting RDD to DataFrame

import spark.implicits._

Copy the code

Create a DataFrame

After SparkSession is created, you can use SparkSession to create dataframes from existing RDD, Hive tables, or other data sources. The following example creates a DataFrame from a JSON file data source:

/ * *

* {"name":"Michael"}

* {"name":"Andy", "age":30}

* {"name":"Justin", "age":19}

* /


val df = spark.read.json("E://people.json")

// Outputs the contents of the DataFrame

df.show()

// +----+-------+

// | age| name|

// +----+-------+

// |null|Michael|

// | 30| Andy|

// | 19| Justin|

// +----+-------+

Copy the code

DataFrame Basic operation

Once the DataFrame is created, you can perform a series of operations on it, as shown in the following code:

// Prints information about the DataFrame

df.printSchema()

// root

// |-- age: long (nullable = true)

// |-- name: string (nullable = true)



// Query the name field

df.select("name").show()

// +-------+

// | name|

// +-------+

// |Michael|

// | Andy|

// | Justin|

// +-------+



// Change everyone's age to + 1

df.select($"name", $"age" + 1).show()

// +-------+---------+

// | name|(age + 1)|

// +-------+---------+

// |Michael| null|

// | Andy| 31|

// | Justin| 20|

// +-------+---------+



// Find information about a person whose age is greater than 21

df.filter($"age" > 21).show()

// +---+----+

// |age|name|

// +---+----+

// | 30|Andy|

// +---+----+



// Count the number of each age group by age

df.groupBy("age").count().show()

// +----+-----+

// | age|count|

// +----+-----+

/ / | | 1 | 19

// |null| 1|

/ / | | 1 | 30

// +----+-----+

Copy the code

Use SQL queries in your program

The above operations use domain-specific language (DSL)**. You can also directly use SQL to operate on the DataFrame, as shown below:

// Register the DataFrame as a temporary view of SQL

// This method creates a local temporary view with a lifetime dependent on its bound SparkSession session

// If the session that created the view ends, the view disappears

df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")

sqlDF.show()

// +----+-------+

// | age| name|

// +----+-------+

// |null|Michael|

// | 30| Andy|

// | 19| Justin|

// +----+-------+

Copy the code

Global Temporary View

The above is Temporary Views, which is within the scope of Spark Session. If the view can be shared among all sessions, you can use Global Temporary View to create the view as follows:

// Register DataFrame as global temporary View

// This method creates a global temporary view with a lifetime dependent on the Spark application bound to it,

// That is, if the application ends, it will be deleted automatically

// The global temporary view can be configured across Spark Sessions, and the system retains the database named 'global_temp'

// When querying, a fully qualified name must be added, such as' SELECT * FROM global_temp.view1 '

df.createGlobalTempView("people")



// The default database for the global temporary view is' global_temp '

spark.sql("SELECT * FROM global_temp.people").show()

// +----+-------+

// | age| name|

// +----+-------+

// |null|Michael|

// | 30| Andy|

// | 19| Justin|

// +----+-------+



// The global temporary view supports cross-Spark Session sessions

spark.newSession().sql("SELECT * FROM global_temp.people").show()

// +----+-------+

// | age| name|

// +----+-------+

// |null|Michael|

// | 30| Andy|

// | 19| Justin|

// +----+-------+

Copy the code

Create a DataSet

DataSet is similar to RDD, but RDD uses Java serializer or Kyro serialization, while DataSet uses Encoder to serialize objects transferred between networks. The following is an example of creating a DataSet:

case class Person(name: String, age: Long)

/ / create a DataSet

val caseClassDS = Seq(Person("Andy".32)).toDS()

caseClassDS.show()

// +----+---+

// |name|age|

// +----+---+

// |Andy| 32|

// +----+---+



// Implicitly transform Spark. Implicits._ by importing Spark

// Can automatically recognize the data type

val primitiveDS = Seq(1.2.3).toDS()

primitiveDS.map(_ + 1).collect() // Return: Array(2, 3, 4)



// A DataFrame can be converted to a DataSet by calling the AS method,

val path = "E://people.json"

val peopleDS = spark.read.json(path).as[Person]

peopleDS.show()

// +----+-------+

// | age| name|

// +----+-------+

// |null|Michael|

// | 30| Andy|

// | 19| Justin|

// +----+-------+

Copy the code

RDD is converted to DataFrame

Spark SQL supports two different ways to convert an RDD to a DataFrame. The first is to use reflection to infer the schema of an RDD containing a particular type of object. This reflection-based approach provides cleaner code and can be used if a Schema is already specified when writing Spark applications. The second approach is to build a schema from a programmable interface and then apply it to an existing RDD. The code written this way is more verbose, and the DataFrame created this way does not know the columns and types of the DataFrame until runtime.

The data set for the following case is people.txt:

Tom, 29

Bob, 30

Jack, 19

Copy the code

By way of reflection

The Scala interface of Spark SQL automatically converts an RDD containing sample classes to a DataFrame. The sample class defines the schema for the table. The parameter name of the sample class is read by reflection and mapped to the name of column.

object RDD2DF_m1 {

  // Create the sample class

  case class  Person(name: String, age: Int)

  def main(args: Array[String]): Unit 
= {

    val spark = SparkSession

      .builder()

      .appName("RDD2DF_m1")

      .master("local")

      .getOrCreate()

    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)

    runRDD2DF(spark)

  }



  private def runRDD2DF(spark: SparkSession) = {

    // Import implicit conversion for RDD to DataFrame

    import spark.implicits._

    // Create an RDD from a text file and convert it to a DataFrame

    val peopleDF = spark.sparkContext

      .textFile("file:///E:/people.txt")

      .map(_.split(","))

      .map(attributes => Person(attributes(0), attributes(1).trim.toInt))

      .toDF()

    // Register the DataFrame as a temporary view

    peopleDF.createOrReplaceTempView("people")

    // Run the SQL statement

    val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

    // Use field indexes to access columns

    teenagersDF.map(teenager => "Name: " + teenager(0)).show()

    // +----------+

    // | value|

    // +----------+

    // |Name: Jack|

    // +----------+



    // Access the column by field name

    teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()

    // +------------+

    // | value|

    // +------------+

    // |Name: Jack|

    // +------------+

  }

}

Copy the code

By building a schema

Creating a DataFrame by building a schema consists of three main steps:

  • 1. Create a Row RDD from the original RDD
  • 2. Use StructType to create a schema
  • 3. Use createDataFrame to apply schema to the RDD of Row type
object RDD2DF_m2 {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession

      .builder()

      .appName("RDD2DF_m1")

      .master("local")

      .getOrCreate()

    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)

    runRDD2DF(spark)

  }



  private def runRDD2DF(spark: SparkSession) = {

    // Import implicit conversion for RDD to DataFrame

    import spark.implicits._

    // Create the original RDD

    val peopleRDD = spark.sparkContext.textFile("E:/people.txt")

    // Step 1 convert the original RDD to a ROW RDD

    val rowRDD = peopleRDD

      .map(_.split(","))

      .map(attributes => Row(attributes(0), attributes(1).trim.toInt))

    Step 2 Create a schema

    val schema = StructType(Array(

      StructField("name", StringType, true),

      StructField("age", IntegerType, true)

    ))

    // Step 3 Create a DF

    val peopleDF = spark.createDataFrame(rowRDD, schema)

    // Register the DataFrame as a temporary view

    peopleDF.createOrReplaceTempView("people")

    // Run the SQL statement

    val results = spark.sql("SELECT name FROM people")

    // Use field indexes to access columns

    results.map(attributes => "Name: " + attributes(0)).show()

    // +----------+

    // | value|

    // +----------+

    // | Name: Tom|

    // | Name: Bob|

    // | Name: Jack|

    // +----------+

  }

}

Copy the code

Spark SQL data source

Spark SQL supports operations on various data sources through the DataFrame interface. DataFrame can be manipulated using relational transformations and temporary views. Common data sources include the following:

File data source

  • Parquet file

  • JSON file

  • The CSV file

  • ORC file

private def runBasicDataSourceExample(spark: SparkSession) :Unit = {

    / * *

* Reads the Parquet file data source and writes the results to the Parquet file

* /




    val usersDF = spark

      .read

      .load("E://users.parquet")

    usersDF.show()

    // Save DF to the parquet file

    usersDF

      .select("name"."favorite_color")

      .write

      .mode(SaveMode.Overwrite)

      .save("E://namesAndFavColors.parquet")

    / * *

* Reads the JSON file data source and writes the result to the Parquet file

* /




    val peopleDF = spark

      .read

      .format("json")

      .load("E://people.json")

    peopleDF.show()

    // Save DF to the parquet file

    peopleDF

      .select("name"."age")

      .write

      .format("parquet")

      .mode(SaveMode.Overwrite)

      .save("E://namesAndAges.parquet")



    / * *

* Read the CSV file data source

* /


    val peopleDFCsv = spark.read.format("csv")

      .option("sep".";")

      .option("inferSchema"."true")

      .option("header"."true")

      .load("E://people.csv")



    / * *

* Write usersDF to ORC file

* /


    usersDF.write.format("orc")

      .option("orc.bloom.filter.columns"."favorite_color")

      .option("orc.dictionary.key.threshold"."1.0")

      .option("orc.column.encoding.direct"."name")

      .mode(SaveMode.Overwrite)

      .save("E://users_with_options.orc")



    / * *

* Save peopleDF as a persistent table, usually in Hive

* /


    peopleDF

      .write

      .option("path"."E://warehouse/people_bucketed"// Save path

      .bucketBy(42."name")           // Divide buckets according to the name field

      .sortBy("age")                  // Sort by age

      .saveAsTable("people_bucketed")



    / * *

* Save userDF as a partition file, similar to Hive partition table

* /


    usersDF

      .write

      .partitionBy("favorite_color")  // Partition field

      .format("parquet")        // File format

      .mode(SaveMode.Overwrite// Save mode

      .save("E://namesPartByColor.parquet")



    / * *

      *

* /


    usersDF

      .write

      .option("path"."E://warehouse/users_partitioned_bucketed"// Save path

      .partitionBy("favorite_color")  / / partition

      .bucketBy(42."name")           / / barrel

      .saveAsTable("users_partitioned_bucketed")



    spark.sql("DROP TABLE IF EXISTS people_bucketed")

    spark.sql("DROP TABLE IF EXISTS users_partitioned_bucketed")

  }

Copy the code

Save the model

Scala/Java Meaning
SaveMode.ErrorIfExists(default) If the target file already exists, an exception is reported
SaveMode.Append If the target file or table already exists, append the result to it
SaveMode.Overwrite If the target file or table already exists, the original content is overwritten
SaveMode.Ignore Similar to CREATE TABLE IF NOT EXISTS in SQL, no operation is performed IF the target file or TABLE already EXISTS

Save as a persistent table

DataFrame can be stored as a persistent table for Hive. Note that this method does not depend on Hive deployment, which means Spark uses Derby to create a default local Hive metaStore, unlike createOrReplaceTempView. This approach directly materializes the result.

For file-based data sources (text, Parquet, JSON, etc.), you can specify a specific path when saving, For example, df.write.option(“path”, “/some/path”).saveastable (“t”)(the file stored in the specified path is in the parquet format). When a table is deleted, custom table paths and table data are not removed. If no specific path is specified, spark uses the warehouse directory (/user/hive/warehouse) by default. When a table is deleted, the default table path will also be deleted.

Hive data source

See the following section: Spark SQL Integrates Hive

The JDBC data source

Spark SQL also includes a data source that can read data from other databases using JDBC. This feature should be used in preference to JdbcRDD. This is because the results are returned as dataframes, which can be easily processed in Spark SQL or connected to other data sources. JDBC data sources are also easier to use in Java or Python because it does not require the user to provide ClassTag.

You can use the Data Sources API to load tables in a remote database as DataFrame or Spark SQL temporary views. The user can specify JDBC connection properties in the data source option. User and password are typically provided as connection properties for logging into the data source. In addition to connection properties, Spark also supports the following case-insensitive options:

The attribute name explain
url The JDBC URL to connect to
dbtable A JDBC table to read or write to
query Specifying a query statement
driver The name of the JDBC driver class used to connect to this URL
partitionColumn, lowerBound, upperBound If these options are specified, they must all be specified. In addition,numPartitionsYou must specify
numPartitions The maximum number of partitions in table reads and writes that can be used for parallel processing. This also determines the maximum number of concurrent JDBC connections. If the number of partitions to write exceeds this limit, we can passcoalesce(numPartitions)Calling before writing reduces it to this limit
queryTimeout The default is0To query the timeout period
fetchsize The fetch size of JDBC, which determines how many rows to fetch at a time. This can help improve the performance of JDBC drivers
batchsize The default is 1000, the JDBC batch size, which can help improve the performance of JDBC drivers.
isolationLevel Transaction isolation level for the current connection. It could be aNONE.READ_COMMITTED.READ_UNCOMMITTED.REPEATABLE_READOr,SERIALIZABLECorresponding to the connection object defined by JDBC, the default value is the standard transaction isolation levelREAD_UNCOMMITTED. This option is only available for writing.
sessionInitStatement After each database session is opened to the remote database and before data is read, this option executes a custom SQL statement that is used to implement the session initialization code.
truncate This is the JDBC Writer-related option. whenSaveMode.OverwriteWhen enabled, it empties the contents of the target table rather than deleting and rebuilding its existing tables. The default isfalse
pushDownPredicate Options for enabling or disabling predicates to push down to JDBC data sources. The default value is true, in which case Spark pushes the filter down to the JDBC data source as far as possible.
object JdbcDatasetExample {

  def main(args: Array[String) :Unit = {

    val spark = SparkSession

      .builder()

      .appName("JdbcDatasetExample")

      .master("local"// Set to run locally

      .getOrCreate()

    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)

    runJdbcDatasetExample(spark)

  }



  private def runJdbcDatasetExample(spark: SparkSession) :Unit = {

    // Note: Load data from a JDBC source

    val jdbcPersonDF = spark.read

      .format("jdbc")

      .option("url"."jdbc:mysql://localhost/mydb")

      .option("dbtable"."person")

      .option("user"."root")

      .option("password"."123qwe")

      .load()

    // Print schema for jdbcDF

    jdbcPersonDF.printSchema()

    // Prints data

    jdbcPersonDF.show()



    val connectionProperties = new Properties(a)

    connectionProperties.put("user"."root")

    connectionProperties.put("password"."123qwe")

    // Load data in. JDBC mode

    val jdbcStudentDF = spark

      .read

      .jdbc("jdbc:mysql://localhost/mydb"."student", connectionProperties)

    // Print schema for jdbcDF

    jdbcStudentDF.printSchema()

    // Prints data

    jdbcStudentDF.show()

    // Save the data to the JDBC source

    jdbcStudentDF.write

      .format("jdbc")

      .option("url"."jdbc:mysql://localhost/mydb")

      .option("dbtable"."student2")

      .option("user"."root")

      .option("password"."123qwe")

      .mode(SaveMode.Append)

      .save()



    jdbcStudentDF

      .write

      .mode(SaveMode.Append)

      .jdbc("jdbc:mysql://localhost/mydb"."student2", connectionProperties)



  }

}

Copy the code

Spark SQL integrates Hive

Spark SQL also supports reading and writing data stored in Apache Hive. However, because Hive has a large number of dependencies, these dependencies are not included in the default Spark distribution. If Hive dependencies can be found on the classpath, Spark will automatically load them. Note that these Hive dependencies must also exist on all worker nodes because they need to access the Hive serialization and deserialization libraries (SerDes) to access data stored in Hive.

XML, core-site. XML, and hdFS-site. XML files in conf/.

To use Hive, you must instantiate a SparkSession that supports Hive, including connecting to persistent Hive Metastore, supporting Hive serialization, deserialization (Serdes), and Hive user-defined functions. Users who do not have Hive deployed can still enable Hive support. If hive-site.xml is not configured, the context automatically creates metastore_db in the current directory and creates a directory configured by spark.sql.warehouse. Dir, whose default directory is spark-warehouse, In the current directory where the Spark application is started. Please note that since the Spark 2.0.0, the in the hive – site. XML in the hive. Metastore. Warehouse. The dir attribute has been marked out of date (deprecated). Use spark.sql.warehouse. Dir to specify the default location in warehouse. You may need to grant write permission to the user who starts the Spark application.

The following example was run locally (for ease of viewing the printed results) and at the end of the run you will find the spark-warehouse and metastore_db folders created in the project directory E:\IdeaProjects\ mySpark. You can see that users who do not have Hive deployed can still enable Hive support, and can also package the code and run it on a cluster.

object SparkHiveExample {





  case class Record(key: Int, value: String)



  def main(args: Array[String]) {





    val spark = SparkSession

      .builder()

      .appName("Spark Hive Example")

      .config("spark.sql.warehouse.dir"."e://warehouseLocation")

      .master("local")// Set to run locally

      .enableHiveSupport()

      .getOrCreate()





    Logger.getLogger("org.apache.spark").setLevel(Level.OFF)

    Logger.getLogger("org.apache.hadoop").setLevel(Level.OFF)

    import spark.implicits._

    import spark.sql

    // Use Spark SQL syntax to create tables in Hive

    sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")

    sql("LOAD DATA LOCAL INPATH 'file:///e:/kv1.txt' INTO TABLE src")



    // Use HiveQL

    sql("SELECT * FROM src").show()

    // +---+-------+

    // |key| value|

    // +---+-------+

    // |238|val_238|

    // | 86| val_86|

    // |311|val_311|

    // ...



    // Aggregate functions are supported

    sql("SELECT COUNT(*) FROM src").show()

    // +--------+

    // |count(1)|

    // +--------+

    / / | 500 |

    // +--------+



    // The result of the SQL query is a DataFrame, which supports the use of all normal functions

    val sqlDF = sql("SELECT key, value FROM src WHERE key < 10 AND key > 0 ORDER BY key")



    // DataFrames are Row types, allowing you to access columns in sequence.

    val stringsDS = sqlDF.map {

      case Row(key: Int, value: String) = >s"Key: $key, Value: $value"

    }

    stringsDS.show()

    // +--------------------+

    // | value|

    // +--------------------+

    // |Key: 0, Value: val_0|

    // |Key: 0, Value: val_0|

    // |Key: 0, Value: val_0|

    // ...



    // You can create a temporary view using DataFrame with SparkSession

    val recordsDF = spark.createDataFrame((1 to 100).map(i => Record(i, s"val_$i")))

    recordsDF.createOrReplaceTempView("records")



    // DataFrame can be used to join tables in Hive

    sql("SELECT * FROM records r JOIN src s ON r.key = s.key").show()

    // +---+------+---+------+

    // |key| value|key| value|

    // +---+------+---+------+

    // | 2| val_2| 2| val_2|

    // | 4| val_4| 4| val_4|

    // | 5| val_5| 5| val_5|

    // ...



    // Create a managed table in Parquet format, USING HQL syntax, not Spark SQL syntax ("USING hive")

    sql("CREATE TABLE IF NOT EXISTS hive_records(key int, value string) STORED AS PARQUET")



    // Read the table in Hive and convert it to DataFrame

    val df = spark.table("src")

    // Save the DataFrame as a table in Hive using Overwrite mode.

    // That is, if the saved table already exists, the contents of the original table will be overwritten

    df.write.mode(SaveMode.Overwrite).saveAsTable("hive_records")

    // Query the data in the table

    sql("SELECT * FROM hive_records").show()

    // +---+-------+

    // |key| value|

    // +---+-------+

    // |238|val_238|

    // | 86| val_86|

    // |311|val_311|

    // ...



    // Set the Parquet data file path

    val dataDir = "/tmp/parquet_data"

    //spark.range(10) returns the DataSet[Long]

    // Write the DataSet directly to the parquet file

    spark.range(10).write.parquet(dataDir)

    Create an external table in Parquet format in Hive

    sql(s"CREATE EXTERNAL TABLE IF NOT EXISTS hive_ints(key int) STORED AS PARQUET LOCATION '$dataDir'")

    // Query the table created above

    sql("SELECT * FROM hive_ints").show()

    // +---+

    // |key|

    // +---+

    / / | | 0

    / / | | 1

    / / | | 2

    // ...



    // Enable Hive dynamic partitioning

    spark.sqlContext.setConf("hive.exec.dynamic.partition"."true")

    spark.sqlContext.setConf("hive.exec.dynamic.partition.mode"."nonstrict")

    // Use DataFrame API to create Hive partition tables

    df.write.partitionBy("key").format("hive").saveAsTable("hive_part_tbl")



    // The partitioning key 'key' will be removed from the final schema

    sql("SELECT * FROM hive_part_tbl").show()

    // +-------+---+

    // | value|key|

    // +-------+---+

    // |val_238|238|

    // | val_86| 86|

    // |val_311|311|

    // ...



    spark.stop()



  }

}

Copy the code

Thrift Server and Spark SQL CLI

Spark SQL can be accessed using JDBC/ODBC or the command line, which allows users to run queries directly using SQL without writing code.

Thrift JDBC/ODBC server

Thrift JDBC/ODBC Server corresponds to HiveServer2 of Hive. You can use Beeline to access the JDBC server. The start-thriftServer. sh script exists in the sbin directory of Spark. You can use this script to start the JDBC/ODBC server:

./sbin/start-thriftserver.sh

Copy the code

If you use Beeline to access the JDBC/ODBC server,Beeline requires the user name and password. In insecure mode, you only need to enter the user name and blank password

beeline> ! connect jdbc:hive2://localhost:10000

Copy the code

Spark SQL CLI

Spark SQL CLI is a convenient tool for running the Hive Metastore service in local mode and executing queries entered from the CLI. Note that the Spark SQL CLI cannot communicate with the Thrift JDBC server.

To start the Spark SQL CLI, run the following command in the bin directory of Spark:

./spark-sql 

Copy the code

conclusion

This article focuses on Spark SQL. This chapter mainly introduces Spark SQL, basic usage of DataFrame&DataSet API, basic principles of Catalyst Optimizer, Spark SQL programming, Spark SQL data source, integration with Hive, and Thrift Server and Spark SQL CLI. The next post will share the Spark Streaming programming guide.