SparkSQl is a module used by Spark to process structured data. It provides a programming abstraction called DataFrame and functions as a distributed SQL query engine. The data type returned in SparkSql is DataFrame
1.1.1. Why learn Spark SQL
We have learned Hive, which converts Hive SQL into MapReduce and submits it to the cluster for execution, greatly simplifying the complexity of writing MapReduce programs because MapReduce is a computing model that is slow to execute. Spark SQL is all about converting Spark SQL into RDD and committing it to the cluster for execution.
HIVE: Simplifies the complexity of writing MapReduce programs
Spark SQL conversion to RDD: Replaces MapReduce to improve efficiency
SparkSQL was introduced in Spark1.0, originally called Shark
1. In-memory column storage – greatly optimizes memory usage, reduces memory consumption, and avoids the performance overhead of GC for large amounts of data
2. Byte-code Generation — Dynamic byte-code generation can be used to optimize performance
3. Optimization of Scala code
Structured data is any data that has structured information. Structural information is a set of known fields shared by each record. When the data meets these conditions, Spark SQL makes reading and querying the data easier and more efficient. Specifically, Spark SQL provides three functions (see Figure 9-1).
(1) Spark SQL can read data from various structured data sources, such as JSON, Hive, Parquet, and so on.
(2) Spark SQL not only supports data query using SQL statements in Spark programs, but also connects Spark SQL to Spark through standard database connector (JDBC/ODBC) from external tools such as Tableau, the business intelligence software.
(3) When Spark SQL is used in Spark programs, Spark SQL supports a high degree of integration between SQL and common Python, Java, and Scala codes, including the connection between RDD and SQL tables, and the public custom SQL function interface. That makes a lot of work easier.
To achieve these functions, Spark SQL provides a special RDD called SchemaRDD. SchemaRDD is an RDD that holds Row objects, each representing a Row of records. SchemaRDD also contains the structural information (that is, data fields) of the record. SchemaRDD looks a lot like regular RDD, but internally, SchemaRDD can be used to store data more efficiently with structural information. In addition, SchemaRDD supports new operations that are not available on RDD, such as running SQL queries. SchemaRDD can be created from an external data source, from query results, or from a generic RDD.
What is a DataFrames
(Data type returned in SparkSql: Conceptually equivalent to tables in a relational database, but optimized for queries)
Like RDD, DataFrame is a distributed data container. DataFrame, however, is more like a two-dimensional table in a traditional database. In addition to data, DataFrame also records information about the structure of the data, called a schema.
1.1.1. Create DataFrames
In Spark SQL, SQLContext is the entry point for creating DataFrames and executing SQL. A SQLContext has been built in Spark-1.6.1
1. Create a local file with id, name, and age columns, separate them with Spaces, and upload the file to HDFS
hdfs dfs -put person.txt /
2. On Spark shell, run the following command to read data and separate the data in each row using column separators
val lineRDD = sc.textFile(“hdfs://node01:9000/person.txt”).map(_.split(” “))
3. Define the case class (equivalent to the schema of the table)
case class Person(id:Int, name:String, age:Int)
4. Associate the RDD with the case class
val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
(The data inside is in Array)
5. Convert the RDD to a DataFrame
val personDF = personRDD.toDF
6. Process the DataFrame
personDF.show
val seq1 = Seq((“1″,”bingbing”,35),(“2″,”yuanyuan”,34),(“3″,”mimi”,33))
val rdd1 =sc.parallelize(seq1)
val df = rdd1.toDF(“id”,”name”,”age”)
df.show
DSL: Domain specific language
//// Check out the DataFrame
// View the contents of the DataFrame section column
1.
2.
3.
// Prints DataFrame Schema information
// Query all names and ages, and age+1
1.df.select(col(“id”),col(“name”),col(“age”)+1).show
2.df.select(df(“id”), df(“name”), df(“age”) + 1).show
// Filter age > 18
df.filter(col(“age”) >= 35).show
// Group by age and count the number of people of the same age
df.groupBy(“age”).count().show()
SQL style syntax
// Query the top two oldest names
1. If you want to use SQL-style syntax, you need to register DataFrame as a table
df.registerTempTable(“t_person”)
2.sqlContext.sql(“select * from t_person order by age desc limit 2”).show
// Display the Schema information of the table
Programmatically execute Spark SQL queries
1. Write Spark SQL query programs
1. Infer Schema from reflection
= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
package com.qf.gp1708.day06
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object InferSchema {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setMaster(“local”)
.setAppName(“inferschema”)
val sc = new SparkContext(conf)
val sqlContext:SQLContext = new SQLContext(sc)
1.
val
“C://Users/Song/Desktop/person.txt”
“,”
3
val
import
4
val
5.
“t_person”
val sql = “select * from t_person where fv > 70 order by age”
val
res.show()
sc.stop()
}
}
2// Create the sample class
case class Godness(id:Long,name:String,age:Int,fv:Int)
= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
2. Specify the Schema directly using StructType
= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
package com.qf.gp1708.day06
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{DataFrame, Row, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
object StructTypeSchema {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName(“str”)
.setMaster(“local”)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val
“hdfs://…”
“,”
“id”
false
“name”
true
“age”
true
“fv”
true
)
}
val
val
“t_person”
val sql = “select name,age,fv from t_person where age >30 order by age desc”
val res = sqlContext.sql(sql)
res.write.mode(“append”).json(“c://out-20180903-1”)
sc.stop()
}
}
= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
1. The data source
JDBC 1.1.
Spark SQL can create a DataFrame by reading data from a relational database using JDBC. After a series of calculations are performed on the DataFrame, data can be written back to the relational database.
1.1.1. Loading data from MySQL (Spark Shell)
1. To start Spark Shell, you must specify the jar package of the mysql connection driver
/ usr/local/spark – 1.6.1 – bin – hadoop2.6 / bin/spark – shell \
–master spark://node01:7077 \
– jars/usr/local/spark – 1.6.1 – bin – hadoop2.6 / mysql connector – Java – 5.1.35 – bin. Jar \
(specify MySQL package)
–driver-class-path /usr/local/spark-1.6.1-bin-hadoop2.6/mysql-connector-java-5.1.35-bin.jar (specify driver class)
2. Load data from mysql
val jdbcDF = sqlContext.read.format(“jdbc”).options(Map(“url” -> “jdbc:mysql://node03:3306/bigdata”, “driver” -> “com.mysql.jdbc.Driver”, “dbtable” -> “person”, “user” -> “root”, “password” -> “root”)).load()
3. Execute the query
jdbcDF.show()
1.1.2. Write data to MySQL (jar package)
package com.qf.gp1708.day06
import java.util.Properties
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.{SparkConf, SparkContext}
object InsertData2MySQLDemo {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName(“”).setMaster(“local[2]”)
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val lines= sc.textFile(“”).map(_.split(“,”))
val
“name”
true
“age”
true
“fv”
true
)
}
val
val
val
new
prop.put(“user”,“root”)
prop.put(“password”,“root”)
prop.put(“driver”,“com.mysql.jdbc.Driver”)
val jdbcUrl=“jdbc:mysql://hadoop03:3306/bigdata”
val table=“person”
“append”
sc.stop()
}
}
/ usr/local/spark – 1.6.3 – bin – hadoop2.6 / spark – submit \
–class com.qf….. \
–master spark://hadoop01:7077 \
–executor-memory 512m \
–total-executor-cores 2 \
–jars /usr/… / mysql connector – Java – 5.1.35 – bin. Jar \
–driver-class-path /usr/… / mysql connector – Java – 5.1.35 – bin. Jar \
/root/1.jar
= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =
Kafka: Message middleware (cache data)– decouple
Provides a unified, high throughput, low wait platform for processing real-time data
3. Why message queues are needed (important, understood)
At the heart of a messaging system are three things: decoupling, asynchrony, and parallelism
Kafka saves messages by Topic
Topic: The bottom layer is the queue, which categorizes different messages in different queues
Publish/subscribe :1 to many
JMS: