This paper will analyze and summarize Spark SQL, DataFrame and Dataset related principles and optimization process in detail.

The Spark SQL profile

  1. Spark SQL is a structured data processing module of Spark with large-scale relational query (Spark core components: Spark SQL, Spark Streaming, Spark MLlib, and Spark GraphX). The DataFrame API and Datasets API are the way to interact with Spark SQL, which supports large-scale distributed in-memory calculations and blares the distinction between RDD and Relational Tables.
  2. Spark SQL runs on Spark Core. It allows developers to import relational data from Hive tables and Parquet files, run SQL queries against imported data and existing RDD, and easily write RDD to Hive tables or Parquet files. Like the DataFrame API provided by Spark SQL, it can perform relational operations on both external data sources and Sparks built-in distributed collections. Spark SQL introduces an extensible optimizer called Catalyst.
  3. Spark SQL uses three main functions of structured and semi-structured data. For example: a) It uses DataFrame in Scala, Java, and Python. Also, to simplify the work of structured data sets, DataFrames are similar to tables in relational databases. B) Spark SQL can read and write data in various structured formats. For example, Hive Table, JSON, and Parquet. C) We can use Spark SQL statements to query data, connect to Spark SQL within the Spark program and from external tools.
  4. Spark SQL has the following features: a) Data compatibility: Data can be obtained from Hive tables, external databases (JDBC), RDD, Parquet files, and JSON files. Data can be operated in Scala or SQL and the result can be transferred back to RDD. B) Component extension: SQL parser, parser, optimizer can be redefined. C) Performance optimization: memory column storage, dynamic bytecode generation and other optimization technologies, memory cache data. D) Multi-language support: Scala, Java, Python, R.

Spark SQL execution plan

Catalyst Optimizer

Catalyst Optimizer is written based on functions in Scala. Catalyst Optimizer supports rule-based and cost-based optimization.

  • In rule-based optimization, the rule-based optimizer uses a rule set to determine how to execute a query.
  • Cost-based optimization is about finding the most appropriate way to execute SQL statements. Cost-based optimization uses rules to generate multiple plans and then calculate their costs.
  • Unresolved logical Plan: Much like the compiler, Spark optimizations are multistage, requiring resolution of expression references and types before any optimizations can be performed.
  • Logical Plan: Based on the Unresolved Logical Plan, Schema information is added. Spark directly simplifies and optimizes the Logical Plan and generates an optimized Logical Plan. These simplifications can be written with rules such as matching patterns. The optimizer is not limited to pattern matching, but rules can also contain arbitrary Scala code.
  • Once the Logical plan is optimized, Spark generates a physical plan. The physical planning phase involves both rule-based and cost-based optimization to produce the best physical plan.

Spark SQL execution plan

Spark SQL optimization improves the productivity of developers and the performance of the queries they write. A good query optimizer will automatically rewrite relational queries to perform more efficiently using techniques such as filtering data early, leveraging available indexes, and even ensuring that different data sources are joined in the most efficient order.

By performing these transformations, the optimizer improves the execution time of relational queries and frees developers from focusing on the semantics of their applications rather than on performance.

Catalyst leverages Scala’s power, such as pattern matching and runtime metaprogramming, to make it easy for developers to specify complex relationship optimizations.

SparkSession

Before Spark2.0

  • SparkContext: Prior to 2.0, sparkContext served as an entry point for Spark applications, and Spark drivers used SparkContext to connect to clusters.
  • SparkConf: Used to specify configuration parameters, such as APPName, or specify spark driver, application, number of cores, etc.
  • To use SQL, HIVE, and Streaming apis, you need to create a separate Context
val conf=newSparkConf()
val sc = new SparkContext(conf)
val hc = new hiveContext(sc)
val ssc = new streamingContext(sc).
Copy the code

In Spark2.0 and later versions

  • SparkSession provides a single entry point to interact with the underlying Spark functionality and allows Spark programs to be written using the Dataframe and DataSet APIS. All of the functionality provided by sparkContext is also available in sparkSession.
  • To use SQL, HIVE, and Streaming apis, there is no need to create a separate Context because sparkSession contains all apis.
  • Once SparkSession is instantiated, we can configure Spark’s runtime configuration properties
Val Session = sparksession.Builder ().enableHivesupport () // Provides a connection to Hive MetaStore. .getOrCreate() // Import the implicits, unlike in core Spark the implicits are defined // on the context. import session.implicits._Copy the code

Schemas

Like RDD, the DataFrame and Dataset under Spark SQL are distributed collections. But DataFrame and Dataset have an additional Schema information relative to RDD. As described above, Schemas can be used in the Catalyst optimizer.

Specify the schema by case class

case class RawPanda(id: Long, zip: String, pt: String, happy: Boolean, attributes: Array[Double]) case class PandaPlace(name: String, pandas: Array[RawPanda]) def createAndPrintSchema() = {val damao = RawPanda(1, "giant", "giant", true, Array(0.1, 1)) val pandaPlace = pandaPlace (" Toronto ", Array(damao)) val df = session.createDataFrame(Seq(pandaPlace)) df.printSchema() }Copy the code

The StructType specifies the Schema directly

import org.apache.spark.{SparkContext, SparkConf} val personRDD = sc.textFile(args(0)).map(_.split(" "))  List( StructField("id", IntegerType, true), StructField("name", StringType, true), StructField("age", IntegerType, Val rowRDD = personrdd.map (p => Row(p(0).toint, p(1).trim, P (2). ToInt)) / / apply schema information to rowRDD val personDataFrame = sqlContext. CreateDataFrame (rowRDD, schema)Copy the code

DataFrame

DataFrame is similar to RDD in that it has immutability, elasticity, distributed computing, and lazy design, including transform and Action. In contrast to RDD, which can handle a large amount of structured data, DataFrame contains rows with a Schema, similar to the header lines of the PANDAS DataFrame.

Note: In contrast to RDD's lazy design, DataFrame is only partially lazy; for example, schema is executed immediately.Copy the code

Why use DataFrame

In contrast to RDD, DataFrame provides memory management and optimized execution plans.

  • Custom memory management: Also known as Tungsten, Spark was developed in Scala, and the JVM implementation introduced some performance limitations and drawbacks (e.g. overhead on GC, Java serialization time) that made Spark unable to match the performance of lower-level languages such as C, Similarly, Tungsten designed a memory management mechanism that instead of being managed by the JVM, Spark’s operation directly uses allocated binary data instead of JVM objects.
  • Optimize execution plan: This is also known as query Optimizer (e.g. Catalyst Optimizer). Using this option, an optimized execution plan is created for the execution of the query. Once the optimized plan is created, it will eventually be executed on Spark’s RDD.

For more information about JVM memory, see off-heap memory versus on-heap memory in the JVM [1] For more information about Tungsten, see Tungston-Github [2]

DataFrame Tips:

  • $can be used to implicitly specify columns in a DataFrame.
  • In Spark DataFrame === === ===! = to filter row data for a specific column.
  • Compared with the Filter operation, distinct and dropDuplicates may cause the shuffle process, which is slow.
  • Unlike RDD groupBy, groupBY in DataFrame has been locally aggregated and then aggregated globally (DataFrame/Dataset groupby Behaviour /optimization[3]).
  • If you need to calculate various complex statistics, it is recommended to perform GroupData (after groupby) :
def minMeanSizePerZip(pandas: DataFrame): DataFrame = {
   	 // Compute the min and mean
  		 pandas.groupBy(pandas("zip")).agg(
     	 min(pandas("pandaSize")), mean(pandas("pandaSize")))
 }
Copy the code
  • Sometimes it is more efficient to use SQL expressions on Hive Data than to operate directly on DataFrame
def registerTable(df: DataFrame): Df.write. SaveAsTable ("perm_pandas")} def querySQL(): DataFrame = {sqlcontext. SQL ("SELECT * FROM pandas WHERE size > 0");Copy the code

Dataset

A Dataset is a data structure in SparkSQL that is strongly typed and contains a specified schema (specifying the types of variables). Dataset is an extension to the DataFrame API. Spark Dataset provides type safety and object-oriented programming interfaces. Magic Lies Here-statically vs Dynamically Typed Languages for definitions of strong typing and type safety [4]

Dataset has the following characteristics:

  • Rdd-like convenience and flexibility when writing code.
  • DataFrame optimization performance (also using Tungsten and Catalyst Query Optimizer)
  • It has the static, type-safe nature of the Scala language
  • With the Spark Dataset, you can check syntax and analysis at compile time in a way that Dataframe, RDD, or regular SQL queries cannot.

RDD, DataFrame, Dataset difference

Differences in data format

  • RDD: It makes it easy and efficient to process structured and unstructured data. But unlike DataFrame and DataSets, RDD does not infer schema information, but requires the user to specify it.
  • DataFrame: This applies only to structured and semi-structured data, and schema information can be inferred. DataFrames allows Spark to manage schemas.
  • Dataset: It can also efficiently handle structured and unstructured data, representing data as a row JVM object or a collection of row objects, which is a generic untyped JVM object.

The three transfers

Insert a picture description here

  • When RDD is transferred to DataFrame (action operation, immediate execution), you need to specify the schema information in the following three methods:

    def createFromCaseClassRDD(input: RDD[PandaPlace]) = {
        // Create DataFrame explicitly using session and schema inference
        val df1 = session.createDataFrame(input)
        // Create DataFrame using session implicits and schema inference
        val df2 = input.toDF()
        // Create a Row RDD from our RDD of case classes
        val rowRDD = input.map(pm => Row(pm.name,
          pm.pandas.map(pi => Row(pi.id, pi.zip, pi.happy, pi.attributes))))
        val pandasType = ArrayType(StructType(List(
          StructField("id", LongType, true),
          StructField("zip", StringType, true),
          StructField("happy", BooleanType, true),
          StructField("attributes", ArrayType(FloatType), true))))
    
    
        // Create DataFrame explicitly with specified schema
        val schema = StructType(List(StructField("name", StringType, true),
          StructField("pandas", pandasType)))
        val df3 = session.createDataFrame(rowRDD, schema)
      }
    Copy the code
  • DataFrame to RDD, simple df. RDD yields a Row Object. Since each Row can contain any content, you need to specify a specific type so that you can retrieve the contents of each column:

    def toRDD(input: DataFrame): RDD[RawPanda] = {
     	val rdd: RDD[Row] = input.rdd
     	rdd.map(row => RawPanda(row.getAs[Long](0 "Long"), row.getAs[String](1 "String"),
     	row.getAs[String](2 "String"), row.getAs[Boolean](3 "Boolean"), row.getAs[Array[Double]](4 "Array[Double]")))
     }
    Copy the code
  • Turn the Dataset

    def fromDF(df: DataFrame): Dataset[RawPanda] = {df.as[RawPanda]//RawPanda is a case class} // RDD to Dataset, ** * can be easily converted a Dataset to an RDD */ def toRDD(ds: Dataset[RawPanda]): RDD[RawPanda] = { ds.rdd } /** * Illustrate converting a Dataset to a DataFrame */ def toDF(ds: Dataset[RawPanda]): DataFrame = { ds.toDF() }Copy the code

Static typing versus runtime type safety

If you use Spark SQL queries, you won’t find syntax errors until runtime (which can be costly), whereas if you use DataFrame and Dataset, you can catch syntax errors at compile time (saving the developer time and overall cost). That is, when you call a function in a DataFrame outside of the API, the compiler will catch the error. However, if you use a field name that doesn’t exist, you won’t find the error until runtime.

The Dataset API is represented as lambda functions and JVM type objects, and any mismatched type parameters can be found at compile time. In addition, your Analysis errors will be found at compile time when a Dataset is used, saving the developer time and expense. For example, DataFrame is compiled without checking column information (for example, whether you write df.select(“name”) or df.select(“naame”) is compiled without reporting an error, whereas the Dataset will detect such errors at compile time.)

Partition way

How to customize partitioning for data distribution is important to avoid the headache of data skew.

Segmented aggregation in stages

  • Shuffle operation using map-side preaggregation. Map-side pre-aggregation refers to the aggregation of the same key locally on each node, similar to the local Combiner in MapReduce. After map-side pre-aggregation, there is only one local key for each node because multiple identical keys are aggregated. When other nodes pull the same key from all nodes, the amount of data that needs to be pulled is greatly reduced, thus reducing disk I/O and network transmission overhead. Generally speaking, it is recommended to use reduceByKey or aggregateByKey operators to replace the groupByKey operator when possible. Because both reduceByKey and aggregateByKey operators use user-defined functions to pre-aggregate the same local key of each node. However, groupByKey operator does not perform pre-aggregation, and the full amount of data is distributed and transmitted among nodes in the cluster, resulting in poor performance. Unlike groupby in RDD, groupBY in DataFrame is aggregated locally and then globally
  • Add random number divided into different stages of aggregation. The core idea of this scheme is to carry out two-stage aggregation. For the first local aggregation, each key is assigned a random number, such as a random number within 10. In this case, the original same key becomes different, such as (hello, 1) (hello, 1) (Hello, 1) (hello, 1). That would be (1_Hello, 1) (1_Hello, 1) (2_hello, 1) (2_hello, 1). Then type the data after random number, perform reduceByKey and other aggregation operations to perform local aggregation, so the local aggregation result will become (1_hello, 2) (2_hello, 2). (Hello,2)(hello,2) (hello,2)(hello,2) (hello,2)(hello,2) (hello,2) —The same key can be changed into multiple different keys by attaching random prefixes. In this way, the data processed by one task can be dispersed to multiple tasks for local aggregation, thus solving the problem of excessive data processed by a single task. Then remove the random prefix and do the global aggregation again to get the final result. Random number reference code is as follows:
Val repartRdd = originRdd // cut.flatMap(_.split(" ")) // Map to tuple.map((_, 1)) // Add random number to key, Map (t => {val rnum = Random. NextInt (partitionNum) (t._1 + "_" + rnum, 1)}) // initially.reduceByKey (_ + _) // Remove key random suffix.map(e => {val word = E._1.toString ().substring(0, E. _1. ToString (). IndexOf (" _ ")) val count = e. _2 (word count)}) / / polymerization again. ReduceByKey (+ _ _) / / sort (false - > descending order, True - > ascending). The map (e = > (e. _2, e. _1)). SortByKey (false). The map (e = > (e. _2, e. _1))Copy the code

Hash Partitioning in Spark and Range Partitioning in Spark

Refer to partition method in the Spark of explanation (www.cnblogs.com/tongxupeng/…

  • Datafame. Repartition (col) : The partition can be performed using a specified column expression. The default hash partition(random key) is used
  • DataFrame. RepartitionByRange (col) : can be partitioned by the specified column expressions, the default Range partitioning (random key)

Spark.DataFrame and DataSet There is no custom partitioning mode. You can first complete RDD custom partitioning and then convert it to DataFrame.

sqlContext.createDataFrame(
  df.rdd.map(r => (r.getInt(1), r)).partitionBy(partitioner).values,
  df.schema
)
Copy the code

Use UDFs & UDAFs

User-defined Functions (UDFS) and User-defined Aggregate Functions (UDAFs) provide ways to extend dataframes and SQL apis with their own custom code, The Catalyst optimizer is also retained. This is very useful for improving performance, otherwise you would need to convert the data to RDD (and possibly again) to perform arbitrary functions, which is very expensive. Udfs and UDAF can also be accessed internally using SQL query expressions.

Note: Writing UDF and UDAF functions in Python loses the performance advantage.Copy the code

UDFs

spark 2.x:

def get_max(x: Double, y: Double): Double={
    if ( x > y )
      x
    else
      y
 }


val udf_get_max = udf(get_max _)
df = df.withColumn("max_fea", udf_get_max(df("fea1"), df("fea2")))
Copy the code

UDAFs

Relative to the udfs udafs writing is relatively complex, need to inherit UserDefinedAggregateFunction and realize the function of inside, but udafs performance is quite good. UDAF can be used directly on columns, or it can be added to the function registry as it is for non-aggregated UDFs.

Example UDAF code for calculating averages:

import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ object AverageUserDefinedAggregateFunction extends UserDefinedAggregateFunction {/ / aggregation function of the input data structure override def inputSchema: StructType = StructField("input", LongType) :: Nil StructType = StructType(StructField("sum", LongType) :: StructField("count", LongType) :: Nil) // Aggregate function return value data structure override def dataType: Override def deterministic: DataType = DoubleType DataType = DoubleType Boolean = true // Override def initialize(buffer: MutableAggregationBuffer): Unit = {buffer(0) = 0L buffer(1) = 0L} MutableAggregationBuffer, input: Row): Unit = { if (input.isNullAt(0)) return buffer(0) = buffer.getLong(0) + input.getLong(0) buffer(1) = buffer.getLong(1) + 1} / / merge aggregation function buffer override def merge (buffer1: MutableAggregationBuffer, buffer2: Row) : Unit = { buffer1(0) = buffer1.getLong(0) + buffer2.getLong(0) buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1) } // Override def evaluate(buffer: Row): Any = buffer.getLong(0).todouble/buffer.getlong (1)}Copy the code

Then register and use the function in the main function:

spark.read.json("data/user").createOrReplaceTempView("v_user") spark.udf.register("u_avg", AverageUserDefinedAggregateFunction) / / the entire form as a group to ask the average age of the all spark. The SQL (" select count (1) as the count, U_avg (age) as avg_age from v_user").show() spark. SQL ("select sex, count(1) as count u_avg(age) as avg_age from v_user group by sex").show()Copy the code

reference

  • High Performance Spark
  • Data – flair. Training/blogs/spark…
  • www.quora.com/What-is-the…
  • www.cnblogs.com/netoxi/p/72…
  • www.infoq.cn/article/thr…
  • Tech.meituan.com/2016/05/12/…
  • www.cnblogs.com/tongxupeng/…

The resources

[1]

Outside of the JVM heap memory (off – heap memory) and heap memory (on – heap memory) : blog.csdn.net/khxu666/art…

[2]

Tungsten – making: github.com/hustnn/Tung…

[3]

DataFrame/Dataset groupBy behaviour/optimization: stackoverflow.com/questions/3…

[4]

Magic lies Here – Statically vs Dynamically Typed Languages: Android.jlel.eu/Magic -lies-…

Machine Learning Online Manual Deep Learning online Manual AI Basics download (PDF updated to25Note: to join this site's wechat group or QQ group, please reply to "add group" to get a discount station knowledge planet coupon, please reply to "knowledge planet"Copy the code

Like articles, click Looking at the