0. Environment preparation
-
The IDEA of integrated Scala
-
Adding Spark dependencies
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>The spark - core_2. 12</artifactId>
<version>3.0.0</version>
</dependency>
Copy the code
1. Common version
Text explanation:
/** * Normal version of WordCount -- by counting the size of the collection ** 1. Read the file and get a line of data * "hello world" * "hello Java "*" Hello Scala "* * 2. Participle * [" hello ", "hello", "hello", "world", "Java" and "scala"] * * 3. Grouping * ("hello", Iterable(hello, hello, hello)) * ("world", Iterable(world)) * (" Java ", Iterable(Java)) * (" Scala ", Iterable(scala)) * * 4. Convert * (" hello, "Iterable (hello, hello, hello)) - > (" hello", 3) * (" world ", Iterable (world)) - > (" world ", 1) * (" Java ", Iterable(java)) --> ("java", 1) * ("scala", Iterable(scala)) --> ("scala", 1) */
Copy the code
Illustration:
Code:
def main(args: Array[String]): Unit = {
TODO 1. Establish a connection with the Spark framework
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")
val sc = new SparkContext(sparkConf)
// TODO 2
// 1. Read the file and obtain the data line by line
val lines: RDD[String] = sc.textFile("datas/")
"Hello world" => "hello", "world"
val words: RDD[String] = lines.flatMap(word => word.split(""))
< hello, Iterable(hello, hello, hello)>
val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(word => word)
// 4. Convert the grouped data
// < hello, Iterable(hello, hello, hello)> ==> <hello, 3>
val wordToCount: RDD[(String, Int)] = wordGroup.map(
kv => (kv._1, kv._2.size)
)
Collect Trigger task execution
val array: Array[(String, Int)] = wordToCount.collect()
array.foreach(println)
//TODO 3. Close resources
sc.stop()
}
Copy the code
Step 4 above, write more succinctly with pattern matching:
val wordToCount: RDD[(String, Int)] = wordGroup.map { // Notice the outermost {}
case (word, list) = > {
(word, list.size)
}
}
Copy the code
2. Classic version
Text explanation:
/** * Recommended WordCount -- reduce aggregation ** 1. Read the file and get a line of data * "hello world" * "hello Java "*" Hello Scala "* * 2. Participle * [" hello ", "hello", "hello", "world", "Java" and "scala"] * * 3. Convert * (hello, 1) (Hello, 1) (Hello, 1) * (world, 1) (Java, 1) (Scala, 1) * * 4. * hello --> Iterable((hello, 1), (hello, 1)) * world --> Iterable(world, 1) * Java --> Iterable(Java, 1) 1) * scala --> Iterable(scala, 1) * * 5. Aggregation computing * (hello, 1) (1) hello, (hello, 1) = = > (hello, 3) * (world, 1) * (Java, 1) * (scala, 1) * /
Copy the code
Illustration:
Code:
def main(args: Array[String]): Unit = {
TODO 1. Establish a connection with the Spark framework
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val sc = new SparkContext(sparkConf)
// TODO 2
// 1. Read the file and obtain the data line by line
val lines: RDD[String] = sc.textFile("datas/")
// 2. 分词: split + flatten
val words: RDD[String] = lines.flatMap(_.split(""))
// 3. 转换,便于统计: hello => (hello, 1)
val wordToOne: RDD[(String, Int)] = words.map(
word => (word, 1))// 4. Group and aggregate the converted data
// Grouping: (hello,1) -> Iterable((hello,1)... (hello,1))
val wordGroup: RDD[(String, Iterable[(String, Int)])] = wordToOne.groupBy(
t => t._1
)
// Iterable((hello,1),... (hello,1)) ==reduce==> (hello, 4)
val wordToCount: RDD[(String, Int)] = wordGroup.map {
case (word, list) => {
val tuple: (String, Int) = list.reduce(
(t1, t2) => {
(t1._1, t2._2 + t1._2)
}
)
tuple
}
}
Collect Trigger task execution
val array: Array[(String, Int)] = wordToCount.collect()
array.foreach(println)
//TODO 3. Close resources
sc.stop()
}
Copy the code
3. Spark simplified version
Tip: Just use reduceByKey (Step4: Group Aggregation) in the classic version instead, which is Spark’s unique API.
Code:
def main(args: Array[String]): Unit = {
TODO 1. Establish a connection with the Spark framework
val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(sparkConf)
// TODO 2
val lines: RDD[String] = sc.textFile("datas")
val words: RDD[String] = lines.flatMap(_.split(""))
val wordToOne: RDD[(String, Int)] = words.map(
word => (word, 1))/* Spark groups and aggregates reduceByKey() using a method: Aggregate values of the same key. The parameter passed in is func: (Int, Int) => Int is the aggregation logic of values */
val wordToCount: RDD[(String, Int)] = wordToOne.reduceByKey((x, y) => x + y)
val array: Array[(String, Int)] = wordToCount.collect()
array.foreach(println)
//TODO 3. Close resources
sc.stop()
}
Copy the code
The final output is:
(hello,4)
(world,2)
(spark,2)
Copy the code
Spark provides a variety of RDD operators. Here we use wordCount as an example to summarize other operators that can solve this problem:
- grouping
// groupBy: "hello" -> Iterable("hello", "hello",...)
def wordcount1(sc : SparkContext) :Unit = {
val rdd = sc.makeRDD(List("Hello Scala"."Hello Spark"))
val words = rdd.flatMap(_.split(""))
val group: RDD[(String.可迭代[String])] = words.groupBy(word=>word)
val wordCount: RDD[(String.Int)] = group.mapValues(iter=>iter.size)
}
// groupByKey: "hello" -> Iterable(1, 1,...)
def wordcount2(sc : SparkContext) :Unit = {
val rdd = sc.makeRDD(List("Hello Scala"."Hello Spark"))
val words = rdd.flatMap(_.split(""))
val wordOne = words.map((_,1)) // ("hello", 1)
val group: RDD[(String.可迭代[Int])] = wordOne.groupByKey()
val wordCount: RDD[(String.Int)] = group.mapValues(iter=>iter.size)
}
Copy the code
- reduction
def wordcount3(sc : SparkContext) :Unit = {
val rdd = sc.makeRDD(List("Hello Scala"."Hello Spark"))
val words = rdd.flatMap(_.split(""))
val wordOne = words.map((_,1)) // ("hello", 1)
// reduceByKey
val wordCount: RDD[(String.Int)] = wordOne.reduceByKey(_+_)
// aggregateByKey
val wordCount: RDD[(String.Int)] = wordOne.aggregateByKey(0) _) (_ + _, _ +// foldByKey
val wordCount: RDD[(String.Int)] = wordOne.foldByKey(0+ _) (_)// combineByKey
val wordCount: RDD[(String.Int)] = wordOne.combineByKey(
v=>v,
(x:Int, y) => x + y,
(x:Int, y:Int) => x + y
)
}
Copy the code
- count
// countByKey
def wordcount7(sc : SparkContext) :Unit = {
val rdd = sc.makeRDD(List("Hello Scala"."Hello Spark"))
val words = rdd.flatMap(_.split(""))
val wordOne = words.map((_,1)) // ("hello", 1)
val wordCount: collection.Map[String.Long] = wordOne.countByKey()
}
// countByValue
def wordcount8(sc : SparkContext) :Unit = {
val rdd = sc.makeRDD(List("Hello Scala"."Hello Spark"))
val words = rdd.flatMap(_.split(""))
val wordCount: collection.Map[String.Long] = words.countByValue()
}
Copy the code
- Merge the map
def wordcount9(sc : SparkContext) :Unit = {
val rdd = sc.makeRDD(List("Hello Scala"."Hello Spark"))
val words = rdd.flatMap(_.split(""))
val mapRdd: RDD[mutable.Map[String.Long]] = words.map(
word => mutable.Map(word -> 1) //Map("hello" -> 1)
)
// Merge two maps
val value: mutable.Map[String.Long] = mapRdd.reduce(
(map1, map2) => {
map2.foreach {
case (word, count) => {
val newCount = map1.getOrElse(word, 0L) + count
map1.update(word, newCount) // Add or update KV
}
}
map1
}
)
println(value) // Map(Hello -> 2, Scala -> 1, Spark -> 1)
}
Copy the code
4, summary
version | Train of thought |
---|---|
Normal version | Group, calculate the set size |
The classic version | Convert to (Word, 1) and use reduce reduction |
The classic version is the core of WordCount, for example (“hello”, 10) and (“hello”, 20) can be reduced to (“hello”, 30), whereas the normal version has limitations.