1. An overview of the

In Spark, a DataFrame is a distributed data set based on RDD, similar to a two-dimensional table in a traditional database. The main difference between DataFrame and RDD is that DataFrame carries schema meta-information, that is, each column of the two-dimensional table dataset represented by DataFrame has a name and type. This allows Spark SQL to gain insights into the structure and optimize the data sources behind the DataFrame and the transformations that operate on the DataFrame, resulting in significant run-time efficiencies. On the other hand, in RDD, Spark Core can only perform simple and general pipelined optimization at the stage level because it is not known the specific internal structure of stored data elements.

Also, like Hive, DataFrame supports nested data types (struct, array, and Map). In terms of API ease of use, the DataFrame API provides a high-level set of relational operations that are friendlier and less intrusive than the functional RDD API.

The figure above visually illustrates the difference between DataFrame and RDD.

The RDD[Person] on the left takes Person as a type parameter, but the Spark framework does not know the internal structure of the Person class. The DataFrame on the right provides detailed structural information, allowing Spark SQL to know exactly what columns are in the dataset, and what the name and type of each column is.

A DataFrame is a view that provides a Schema for data. You can treat it like a table in a database

DataFrame is lazy, but performs better than RDD for the main reason: the optimized execution plan, the query plan, is optimized with the Spark Catalyst Optimiser. Here’s an example:To illustrate query optimization, let’s look at an example of demographic data analysis shown in the figure above. Two dataframes are constructed, and a filter operation is performed after joining them.

If the execution plan is carried out unchanged, the final execution efficiency is not high. Because join is an expensive operation, it may also result in a large data set. If we can push the filter down below the join and filter the DataFrame first, then join the smaller result set after filtering, we can effectively shorten the execution time. Spark SQL’s query optimizer does just that. In short, logical query plan optimization is a process of replacing high-cost operations with low-cost operations using equivalent transformations based on relational algebra.

2. Three ways to create a DataFrame

1. Create it from the Spark data source

  1. View the data source formats supported by Spark for creating files
scala> spark.read.
csv   format   jdbc   json   load   option   options   orc   parquet
schema   table   text   textFile
Copy the code
  1. Read the JSON file to create the DataFrame
scala> val df = spark.read.json("/opt/module/spark-local/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
Copy the code

Note: If you fetch data from memory, Spark knows exactly what the data type is. If it is a number, it defaults to Int. Bigint = bigint; bigint = bigint; bigint = bigint

  1. The results show
scala> df.show
+---+--------+
|age|    name|
+---+--------+
| 18|qiaofeng|
| 19|  duanyu|
| 20|    xuzhu|
+---+--------+
Copy the code

2. Convert from the RDD

3. Query information from the Hive Table

3. SQL style syntax

This style of query must be assisted by a temporary view or global view.

1) Create a DataFrame

scala> val df = spark.read.json("/opt/module/spark-local/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
Copy the code

2) Create a temporary table on the DataFrame

scala> df.createOrReplaceTempView("people")
Copy the code

3) Through THE SQL statement to achieve the full query table

scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
Copy the code

4) Results presentation

scala> sqlDF.show
+---+--------+
|age|    name|
+---+--------+
| 18|qiaofeng|
| 19|  duanyu|
| 20|   xuzhu|
+---+--------+
Copy the code

Note: Normal temporary tables are session-scoped. If you want to apply scoped, you can use global temporary tables. Global temporary tables require full path access, for example, global_temp.people

5) Create a global table for DataFrame

scala> df.createGlobalTempView("people")
Copy the code

6) Through THE SQL statement to achieve the full table query

scala> spark.sql("SELECT * FROM global_temp.people").show()
+---+--------+
|age|    name|
+---+--------+
| 18|qiaofeng|
| 19|  duanyu|
| 20|   xuzhu|
+---+--------+

scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()
+---+--------+
|age|    name|
+---+--------+
| 18|qiaofeng|
| 19|  duanyu|
| 20|   xuzhu|
+---+--------+
Copy the code

Dsl-style syntax

DataFrame provides a domain-specific language (DSL) to manage structured data, which can be used in Scala, Java, Python, and R. The DSL syntax style eliminates the need to create temporary views

1) Create a DataFrame

scala> val df = spark.read.json("/opt/module/spark-local /people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
Copy the code

2) View the Schema information of the DataFrame

scala> df.printSchema
root
 |-- age: Long (nullable = true)
 |-- name: string (nullable = true)
Copy the code

3) Only view the data in the “name” column

scala> df.select("name").show()
+--------+
|    name|
+--------+
|qiaofeng|
|  duanyu|
|   xuzhu|
+--------+
Copy the code

4) View all columns

scala> df.select("*").show
+--------+---------+
|    name |age|
+--------+---------+
|qiaofeng|       18|
|  duanyu|       19|
|   xuzhu|       20|
+--------+---------+
Copy the code

5) View the data in the “Name” column and the “Age +1” column

Note: When it comes to calculations, you must use $for each column

scala> df.select($"name", $"age" + 1).show
+--------+---------+
|    name|(age + 1)|
+--------+---------+
|qiaofeng|       19|
|  duanyu|       20|
|   xuzhu|       21|
+--------+---------+
Copy the code

6) View data whose “age” is greater than “19”

scala> df.filter($"age">19).show
+---+-----+
|age| name|
+---+-----+
| 20|xuzhu|
+---+-----+
Copy the code

7) Group data items according to age and view the number of data items

scala> df.groupBy("age").count.show
+---+-----+
|age|count|
+---+-----+
| 19|    1|
| 18|    1|
| 20|    1|
+---+-----+
Copy the code

5. Convert RDD to DataFrame

Note: Import Spark.implicits. _ (Spark is not the package name, but the name of the sparkSession object. So you must create the SparkSession object before importing it.

Prerequisites:

  • Import the implicit transformation and create an RDD
  • People.txt is prepared in /opt/module/spark-local/
qiaofeng,18
xuzhu,19
duanyu,20
Copy the code
scala> import spark.implicits._
import spark.implicits._

scala> val peopleRDD = sc.textFile("/opt/module/spark-local/people.txt"PeopleRDD: org.apache.spark.rddRDD[String] = /opt/module/spark-local /people.txt MapPartitionsRDD[3] at textFile at <console>:27
Copy the code

1. Manually determine the conversion

scala> peopleRDD.map{x=> val fields=x.split(",");
(fields(0),fields(1).trim.toInt)}.toDF("name"."age").show
+--------+---+
|    name|age|
+--------+---+
|qiaofeng| 18|
|   xuzhu| 19|
|  duanyu| 20|
+--------+---+
Copy the code

2. Example class reflection conversion (common)

  1. Create a sample class
scala> case class People(name:String,age:Int)
Copy the code
  1. Convert the RDD to a DataFrame based on the sample class
scala> peopleRDD.map{x=> var 
fields=x.split(",");People(fields(0),fields(1).toInt)}.toDF.show
+--------+---+
|    name|age|
+--------+---+
|qiaofeng| 18|
|   xuzhu| 19|
|  duanyu| 20|
+--------+---+
Copy the code

3. Through programming (understand, rarely use)

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types.{IntegerType.StringType.StructField.StructType}
import org.apache.spark.sql.{DataFrame.Dataset.Row.SparkSession}

object DataFrameDemo2 {
    def main(args: Array[String) :Unit = {
        val spark: SparkSession = SparkSession.builder()
            .master("local[*]")
            .appName("Word Count")
            .getOrCreate()
        val sc: SparkContext = spark.sparkContext
        val rdd: RDD[(String.Int)] = sc.parallelize(Array(("lisi".10), ("zs".20), ("zhiling".40)))
        // DataSet[Row] maps a RDD[Row], because DataFrame is DataSet[Row]
        val rowRdd: RDD[Row] = rdd.map(x => Row(x._1, x._2))
        // Create the StructType
        val types = StructType(Array(StructField("name".StringType), StructField("age".IntegerType)))
        val df: DataFrame = spark.createDataFrame(rowRdd, types)
        df.show
    }
}
Copy the code

6. Convert DataFrame to RDD

Call RDD directly

1) Create a DataFrame

scala> val df = spark.read.json("/opt/module/spark-local/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint,name: string]
Copy the code

2) Convert DataFrame to RDD note: The resulting RDD storage type is Row

scala> val dfToRDD = df.rdd
dfToRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = 
MapPartitionsRDD[19] at rdd at <console>:29
Copy the code

3) print RDD

scala> dfToRDD.collect
res3: Array[org.apache.spark.sql.Row] = Array([18,qiaofeng], 
[19,duanyu], [20,xuzhu])
Copy the code