SparkSQl is a module used by Spark to process structured data. It provides a programming abstraction called DataFrame and functions as a distributed SQL query engine. The data type returned in SparkSql is DataFrame

1.1.1. Why learn Spark SQL

We have learned Hive, which converts Hive SQL into MapReduce and submits it to the cluster for execution, greatly simplifying the complexity of writing MapReduce programs because MapReduce is a computing model that is slow to execute. Spark SQL is all about converting Spark SQL into RDD and committing it to the cluster for execution.

HIVE: Simplifies the complexity of writing MapReduce programs

Spark SQL conversion to RDD: Replaces MapReduce to improve efficiency

SparkSQL was introduced in Spark1.0, originally called Shark

1. In-memory column storage – greatly optimizes memory usage, reduces memory consumption, and avoids the performance overhead of GC for large amounts of data

2. Byte-code Generation — Dynamic byte-code generation can be used to optimize performance

3. Optimization of Scala code

Structured data is any data that has structured information. Structural information is a set of known fields shared by each record. When the data meets these conditions, Spark SQL makes reading and querying the data easier and more efficient. Specifically, Spark SQL provides three functions (see Figure 9-1).

(1) Spark SQL can read data from various structured data sources, such as JSON, Hive, Parquet, and so on.

(2) Spark SQL not only supports data query using SQL statements in Spark programs, but also connects Spark SQL to Spark through standard database connector (JDBC/ODBC) from external tools such as Tableau, the business intelligence software.

(3) When Spark SQL is used in Spark programs, Spark SQL supports a high degree of integration between SQL and common Python, Java, and Scala codes, including the connection between RDD and SQL tables, and the public custom SQL function interface. That makes a lot of work easier.

To achieve these functions, Spark SQL provides a special RDD called SchemaRDD. SchemaRDD is an RDD that holds Row objects, each representing a Row of records. SchemaRDD also contains the structural information (that is, data fields) of the record. SchemaRDD looks a lot like regular RDD, but internally, SchemaRDD can be used to store data more efficiently with structural information. In addition, SchemaRDD supports new operations that are not available on RDD, such as running SQL queries. SchemaRDD can be created from an external data source, from query results, or from a generic RDD.

What is a DataFrames

(Data type returned in SparkSql: Conceptually equivalent to tables in a relational database, but optimized for queries)

Like RDD, DataFrame is a distributed data container. DataFrame, however, is more like a two-dimensional table in a traditional database. In addition to data, DataFrame also records information about the structure of the data, called a schema.

1.1.1. Create DataFrames

In Spark SQL, SQLContext is the entry point for creating DataFrames and executing SQL. A SQLContext has been built in Spark-1.6.1

1. Create a local file with id, name, and age columns, separate them with Spaces, and upload the file to HDFS

hdfs dfs -put person.txt /

2. On Spark shell, run the following command to read data and separate the data in each row using column separators

val lineRDD = sc.textFile(“hdfs://node01:9000/person.txt”).map(_.split(” “))

3. Define the case class (equivalent to the schema of the table)

case class Person(id:Int, name:String, age:Int)

4. Associate the RDD with the case class

val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))

(The data inside is in Array)

5. Convert the RDD to a DataFrame

val personDF = personRDD.toDF

6. Process the DataFrame

personDF.show

val seq1 = Seq((“1″,”bingbing”,35),(“2″,”yuanyuan”,34),(“3″,”mimi”,33))

val rdd1 =sc.parallelize(seq1)

val df = rdd1.toDF(“id”,”name”,”age”)

df.show

DSL: Domain specific language

//// Check out the DataFrame



// View the contents of the DataFrame section column

1.



2.



3.



// Prints DataFrame Schema information



// Query all names and ages, and age+1

1.df.select(col(“id”),col(“name”),col(“age”)+1).show



2.df.select(df(“id”), df(“name”), df(“age”) + 1).show



// Filter age > 18

df.filter(col(“age”) >= 35).show



// Group by age and count the number of people of the same age

df.groupBy(“age”).count().show()



SQL style syntax

// Query the top two oldest names

1. If you want to use SQL-style syntax, you need to register DataFrame as a table

df.registerTempTable(“t_person”)

2.sqlContext.sql(“select * from t_person order by age desc limit 2”).show



// Display the Schema information of the table



Programmatically execute Spark SQL queries

1. Write Spark SQL query programs

1. Infer Schema from reflection

= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =

package com.qf.gp1708.day06

// Get user information by reflection

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, SQLContext}

import org.apache.spark.{SparkConf, SparkContext}

object InferSchema {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

.setMaster(“local”)

.setAppName(“inferschema”)

val sc = new SparkContext(conf)

val sqlContext:SQLContext = new SQLContext(sc)

1.

// Get data and shard

val
“C://Users/Song/Desktop/person.txt”
“,”

3

// Associate the obtained data with the Person sample class

val

Godness

// Introduce an implicit conversion function so that the toDF method can be called

import

4

// Convert personRDD to DataFrame

val

5.

// Register a temporary table

“t_person”

val sql = “select * from t_person where fv > 70 order by age”

/ / query

val

res.show()

sc.stop()

}

}

2// Create the sample class

case class Godness(id:Long,name:String,age:Int,fv:Int)

= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =

2. Specify the Schema directly using StructType

= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =

package com.qf.gp1708.day06

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{DataFrame, Row, SQLContext}

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

import org.apache.spark.{SparkConf, SparkContext}

/ * *

* Specify the Schema directly using the StructType type

* /

object StructTypeSchema {

def main(args: Array[String]): Unit = {

val conf = new SparkConf()

.setAppName(“str”)

.setMaster(“local”)

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

// Get data and shard

val
“hdfs://…”
“,”

// Specify schema information

StructType

List

StructField

“id”
false

StructField

“name”
true

StructField

“age”
true

StructField

“fv”
true

)

}

// Start mapping

val

Row

// Convert RDD to DataFrame

val

// Create temporary tables

“t_person”

val sql = “select name,age,fv from t_person where age >30 order by age desc”

val res = sqlContext.sql(sql)

res.write.mode(“append”).json(“c://out-20180903-1”)

sc.stop()

}

}

= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =

1. The data source

JDBC 1.1.

Spark SQL can create a DataFrame by reading data from a relational database using JDBC. After a series of calculations are performed on the DataFrame, data can be written back to the relational database.

1.1.1. Loading data from MySQL (Spark Shell)

1. To start Spark Shell, you must specify the jar package of the mysql connection driver

/ usr/local/spark – 1.6.1 – bin – hadoop2.6 / bin/spark – shell \

–master spark://node01:7077 \

– jars/usr/local/spark – 1.6.1 – bin – hadoop2.6 / mysql connector – Java – 5.1.35 – bin. Jar \

(specify MySQL package)

–driver-class-path /usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar (specify driver class)

2. Load data from mysql

val jdbcDF = sqlContext.read.format(“jdbc”).options(Map(“url” -> “jdbc:mysql://node03:3306/bigdata”, “driver” -> “com.mysql.jdbc.Driver”, “dbtable” -> “person”, “user” -> “root”, “password” -> “root”)).load()

3. Execute the query

jdbcDF.show()

1.1.2. Write data to MySQL (jar package)

package com.qf.gp1708.day06

import java.util.Properties

import org.apache.spark.rdd.RDD

import org.apache.spark.sql.{Row, SQLContext}

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}

import org.apache.spark.{SparkConf, SparkContext}

/ * *

* Write data to MySQL

* /

object InsertData2MySQLDemo {

def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName(“”).setMaster(“local[2]”)

val sc = new SparkContext(conf)

val sqlContext = new SQLContext(sc)

val lines= sc.textFile(“”).map(_.split(“,”))

/ / generated Schema

val

Array

StructField

“name”
true

StructField

“age”
true

StructField

“fv”
true

)

}

/ / map

val

Row

/ / generated DataFrame

val

// Generate configuration information for writing to MySQL

val
new

prop.put(“user”,“root”)

prop.put(“password”,“root”)

prop.put(“driver”,“com.mysql.jdbc.Driver”)

val jdbcUrl=“jdbc:mysql://hadoop03:3306/bigdata”

val table=“person”

// write data to MySQL

“append”

sc.stop()

}

}

/ usr/local/spark – 1.6.3 – bin – hadoop2.6 / spark – submit \

–class com.qf….. \

–master spark://hadoop01:7077 \

–executor-memory 512m \

–total-executor-cores 2 \

–jars /usr/… / mysql connector – Java – 5.1.35 – bin. Jar \

–driver-class-path /usr/… / mysql connector – Java – 5.1.35 – bin. Jar \

/root/1.jar

= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =

Kafka: Message middleware (cache data)– decouple

Provides a unified, high throughput, low wait platform for processing real-time data

3. Why message queues are needed (important, understood)

At the heart of a messaging system are three things: decoupling, asynchrony, and parallelism



Kafka saves messages by Topic

Topic: The bottom layer is the queue, which categorizes different messages in different queues

  

Publish/subscribe :1 to many

  

JMS: