Introduction to the

Org. Apache. Spark. SQL. The Dataset is the spark in SQL core classes, are defined as follows:

class Dataset[T] extends Serializable
Copy the code

DataFrame is the alias of the Dataset[Row].

This article is based on Spark2.3.0.

The following is an introduction to class methods.

Class method

Actions

collect(): Array[T] returns an array containingDatasetData for all rows. Note: All data is loaded into the memory of the driver process. collectAsList():List[T[same as above, but returnJavaThe list. count():LongDescribe (COLs:String*) :DataFrameCount, mean, stddev, min, and max.head ();TReturn the first line head(n:Int) :Array[T] return beforeNLine first () :TReturn the first line, which is an alias for head(). foreach(f: (T) ⇒ Unit) :UnitForeachPartition (f: (Iterator[T]) ⇒ Unit) :UnitApply f function reduce(func: (T.T) ⇒ T) :TAccording to the mapping function func, yesRDDThe element in the. Note: the provided function should satisfy commutative and associative laws, otherwise the calculation result will be nondeterministic. show(numRows:Int, truncate: Int, vertical: Boolean) :UnitPrint out data in tabular form. NumRows: indicates the number of rows to be displayed. Truncate: indicates that a string value is trimmed to a specified length. Vertical: indicates that the value is printed vertically. show(numRows:Int, truncate: Int) :Unit
show(numRows: Int, truncate: Boolean) :Unit
show(truncate: Boolean) :Unit
numRows=20 truncate=20

show(numRows: Int) :Unit
truncate=20

show(): Unit
numRows=20 truncate=20

summary(statistics: String*) :DataFrameCount, mean, stddev, min, Approximate quartiles (percentiles at25%, 50%, and 75%), and Max. If not specified, all will be counted. take(n:Int) :Array[T] get the first n line takeAsList(n:Int) :List[T] get the first n lines and save them as list toLocalIterator():Iterator[T] returns an iterator for all rowsThe iterator will consume as much memory as the largest partition in this Dataset.


Copy the code

Basic Dataset functions

as[U] (implicit arg0: Encoder[U) :Dataset[U] maps data to the specified typeU, return newDataset

persist(newLevel: StorageLevel) :Dataset.this.typeCache data. The cache level can be set. persist():Dataset.this.typeCache ():Dataset.this.typeCaching data,MEMORY_AND_DISKMode. Note:RDDThe default cache function forMEMORY_ONLY. checkpoint(eager:Boolean) :Dataset[T] returns a checkconservativeDataset.DatasetThe logical execution plan of the checkpoint():Dataset[T[as above, eager=true.

columns: Array[StringThe array form returns all column names. dtypes:Array[(String.StringThe array form returns all column names and types. createGlobalTempView(viewName:String) :UnitCreate a global temporary view (view) with a lifecycleSparkApplication consistency. It can be accessed across sessions. e.g.SELECT * FROM global_temp.view1.

createOrReplaceGlobalTempView(viewName: String) :UnitIf it already exists, replace it. createTempView(viewName:String) :UnitCreate a local temporary view (view)SparkSessionCan be accessed. Note: it is not bound to any library and cannot be accessed as db1.view1. createOrReplaceTempView(viewName:String) :UnitIf it already exists, replace it. explain():UnitPrint the physical execution plan separately: queryExecution variable, complete execution plan. explain(extended:Boolean) :UnitPrint physical + logical execution plan hint(name:String, parameters: Any*) :Dataset[T] Current dataset specifies hint.//todo
e.g. df1.join(df2.hint("broadcast"))

inputFiles: Array[String] return compositionDatasetInput file (Returns a best-effort snapshot of the files that compose this Dataset) isLocal:BooleanWhether collect and take can be executed locally without the need for executor. localCheckpoint(eager:Boolean) :Dataset[T] Execute localCheckpoint, returns the new dataset. localCheckpoint():Dataset[T]
eager=true

printSchema(): UnitPrint schema structure RDD:RDD[T] dataset internalRDD

schema: StructType
schema

storageLevel: StorageLevelCurrent storage level, if not persistedStorageLevel.NONE

toDF(): DataFrame
toDF(colNames: String*) :DataFrametoDataFrame, can also beRDDtoDataFrame. unpersist():Dataset.this.type
unpersist(blocking: Boolean) :Dataset.this.typeDelete cache, blocking indicates whether to wait for all blocks to be deleted before returning, blocking during deletion. write:DataFrameWriter[T]
DataFrameWriterNon-streaming data write interface. writeStream:DataStreamWriter[T]
DataStreamWriter, streaming data write interface.Copy the code

Streaming function

isStreaming: BooleanWhether to stream data withWatermark(eventTime:String, delayThreshold: String) :Dataset[T]
Defines an event time watermark for this Dataset.
//TODO


Copy the code

Typed transformations

alias(alias: Symbol) :Dataset[T]
alias(alias: String) :Dataset[T]
as(alias: Symbol) :Dataset[T]
as(alias: String) :Dataset[T] toDatasetAn alias coalesce(numPartitions:Int) :Dataset[T] partition merge (reduce partitions only) distinct():Dataset[T] dropDuplicates alias dropDuplicates(COL1:String, cols: String*) :Dataset[T]
dropDuplicates(colNames: Array[String) :Dataset[T]
dropDuplicates(colNames: Seq[String) :Dataset[T]
dropDuplicates(): Dataset[T] Deduplicates data based on the specified field. except(other:Dataset[T) :Dataset[T] remove rows that are also in other. withEXCEPT DISTINCT in SQL.//TODO

filter(func: (T) ⇒ Boolean) :Dataset[T]
filter(conditionExpr: String) :Dataset[T]
filter(condition: Column) :Dataset[T] filter lines according to conditions"age > 15")
peopleDs.filter($"age" > 15)

flatMap[U](func: (T) ⇒ TraversableOnce[U]) (implicit arg0: Encoder[U) :Dataset[UThe first step is the same as map, and the last step is to merge all the output. groupByKey[K](func: (T) ⇒ K) (implicit arg0: Encoder[K) :KeyValueGroupedDataset[K.T] Now generate keys from the func function and group them by key. intersect(other:Dataset[T) :Dataset[T] find the intersection of two datasets, which is equivalent toINTERSECT in SQL.

joinWith[U](other: Dataset[U], condition: Column) :Dataset[(T.U)] inner equi-join two datasets joinWith[U](other: Dataset[U], condition: Column, joinType: String) :Dataset[(T.U)] joinType: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer limit(n:Int) :Dataset[T] returns the first n rows, unlike head, which is an action and immediately returns an array of results. map[U](func: (T) ⇒ U) (implicit arg0: Encoder[U) :Dataset[U] apply the func function to each element to return the dataset containing the result set. mapPartitions[U](func: (Iterator[T]) ⇒ Iterator[U]) (implicit arg0: Encoder[U) :Dataset[U] Apply the func function to each partition to return the dataset containing the result set. orderBy(sortExprs:Column*) :Dataset[T]
orderBy(sortCol: String, sortCols: String*) :Dataset[T] sort alias sort(sortExprs:Column*) :Dataset[T]
sort(sortCol: String, sortCols: String*) :Dataset[T] sort by specified column, default asC. e.g. ds.sort($"col1", $"col2".desc)

sortWithinPartitions(sortExprs: Column*) :Dataset[T]
sortWithinPartitions(sortCol: String, sortCols: String*) :Dataset[T] sort within partitions, same as"SORT BY" in SQL (Hive QL).

randomSplit(weights: Array[Double) :Array[Dataset[T]]
randomSplit(weights: Array[Double], seed: Long) :Array[Dataset[T] [repartition(partitionExprs:Column*) :Dataset[T]
repartition(numPartitions: Int, partitionExprs: Column*) :Dataset[T]
repartition(numPartitions: Int) :Dataset[T] rehash hash by the specified expression, number of partitions, same as"DISTRIBUTE BY" in SQL. The default partition number for spark. SQL. Shuffle. Partitions repartitionByRange (partitionExprs:Column*) :Dataset[T]
repartitionByRange(numPartitions: Int, partitionExprs: Column*) :Dataset[T] repartition according to the specified expression, number of partitions, usingRangePartition: Key range partition. The default sorting mode of the partition is ascending NULls first. Data in the partition is not sorted. sample(withReplacement:Boolean, fraction: Double) :Dataset[T]
sample(withReplacement: Boolean, fraction: Double, seed: Long) :Dataset[T]
sample(fraction: Double) :Dataset[T]
sample(fraction: Double, seed: Long) :Dataset[T] randomly sampling this data withReplacement:Sample withReplacement or not. Fraction:Fraction of rows to generate, range [0.0.1.0]. Seed:Seed for sampling.

select[U1](c1: TypedColumn[T.U1) :Dataset[U1] obtain column data by column/expressionU](t: (Dataset[T]) ⇒ Dataset[U) :Dataset[UApply t function conversionDataset. union(other:Dataset[T) :Dataset[T] is equal to theUNION ALL in SQL. Note the merge by column position:val df1 = Seq((1.2.3)).toDF("col0"."col1"."col2")
val df2 = Seq((4.5.6)).toDF("col1"."col2"."col0")
df1.union(df2).show

// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
/ / | 1 | 2 | 3 |
/ / 4 5 | | | | 6
// +----+----+----+

unionByName(other: Dataset[T) :Dataset[T] same as the union method, but merged by column name:val df1 = Seq((1.2.3)).toDF("col0"."col1"."col2")
val df2 = Seq((4.5.6)).toDF("col1"."col2"."col0")
df1.unionByName(df2).show
// output:
// +----+----+----+
// |col0|col1|col2|
// +----+----+----+
/ / | 1 | 2 | 3 |
/ / | | | | 5 4 6
// +----+----+----+

where(conditionExpr: String) :Dataset[T]
where(condition: Column) :Dataset[T] alias of filterCopy the code

Untyped transformations

Return type DataFrame not Dataset.

agg(expr: Column, exprs: Column*) :DataFrame
agg(exprs: Map[String.String) :DataFrame
agg(aggExpr: (String.String), aggExprs: (String.String) *) :DataFrameAggregate across the dataset. ds.agg(...) Is the ds. The groupBy (). Agg (...). The shorthand. e.g. ds.agg(max($"age"), avg($"salary"))
ds.agg(Map("age" -> "max"."salary" -> "avg"))
ds.agg("age" -> "max"."salary" -> "avg")


apply(colName: String) :Column
col(colName: String) :Column
colRegex(colName: String) :ColumnReturns the specified column. crossJoin(right:Dataset[_]) :DataFrameCross the join. cube(col1:String, cols: String*) :RelationalGroupedDataset
cube(cols: Column*) :RelationalGroupedDatasetCreate a cube using the specified column.//TODO

drop(col: Column) :DataFrame
drop(colNames: String*) :DataFrame
drop(colName: String) :DataFrameClipped the specified field. groupBy(col1:String, cols: String*) :RelationalGroupedDataset
groupBy(cols: Column*) :RelationalGroupedDatasetGroup join(right:Dataset[_], joinExprs: Column, joinType: String) :DataFrame
join(right: Dataset[_], joinExprs: Column) :DataFrame
join(right: Dataset[_], usingColumns: Seq[String], joinType: String) :DataFrame
join(right: Dataset[_], usingColumns: Seq[String) :DataFrame
join(right: Dataset[_], usingColumn: String) :DataFrame
join(right: Dataset[_]) :DataFrameWith anotherDataFrameThe join. JoinExprs: $"df1Key"= = = $"df2Key"UsingColumn:Seq("user_id"."user_name"JoinType:Default inner. Must be one of: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, left_anti.


na: DataFrameNaFunctionsDataFrameNaFunctions

stat: DataFrameStatFunctionsDataFrameStatFunctions

rollup(col1: String, cols: String*) :RelationalGroupedDataset
rollup(cols: Column*) :RelationalGroupedDatasetRollup aggregation using the specified column.//TODO

select(col: String, cols: String*) :DataFrame
select(cols: Column*) :DataFrame
selectExpr(exprs: String*) :DataFrameSelects the specified column,SQLExpression. withColumn(colName:String, col: Column) :DataFrameAdd or replace a column. withColumnRenamed(existingName:String, newName: String) :DataFrameRename the specified column.Copy the code

Ungrouped


queryExecution: QueryExecutionExecution Plan sparkSession:SparkSessionTo create the datasetSparkSession

sqlContext: SQLContextThe dataset ofSQLContext

toJSON: Dataset[String] convert each row of data toJSONA string. toString():String
AnyThe toStringCopy the code

reference

  • Spark API