0. Environment preparation

  1. The IDEA of integrated Scala

  2. Adding Spark dependencies

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>The spark - core_2. 12</artifactId>
    <version>3.0.0</version>
</dependency>
Copy the code

1. Common version

Text explanation:

/** * Normal version of WordCount -- by counting the size of the collection ** 1. Read the file and get a line of data * "hello world" * "hello Java "*" Hello Scala "* * 2. Participle * [" hello ", "hello", "hello", "world", "Java" and "scala"] * * 3. Grouping * ("hello", Iterable(hello, hello, hello)) * ("world", Iterable(world)) * (" Java ", Iterable(Java)) * (" Scala ", Iterable(scala)) * * 4. Convert * (" hello, "Iterable (hello, hello, hello)) - > (" hello", 3) * (" world ", Iterable (world)) - > (" world ", 1) * (" Java ", Iterable(java)) --> ("java", 1) * ("scala", Iterable(scala)) --> ("scala", 1) */
Copy the code

Illustration:

Code:

def main(args: Array[String]): Unit = {
    TODO 1. Establish a connection with the Spark framework
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("wordCount")
    val sc = new SparkContext(sparkConf)

        
    // TODO 2
    // 1. Read the file and obtain the data line by line
    val lines: RDD[String] = sc.textFile("datas/")
    "Hello world" => "hello", "world"
    val words: RDD[String] = lines.flatMap(word => word.split(""))
    < hello, Iterable(hello, hello, hello)>
    val wordGroup: RDD[(String, Iterable[String])] = words.groupBy(word => word)
        
    // 4. Convert the grouped data
    // < hello, Iterable(hello, hello, hello)> ==> <hello, 3>
    val wordToCount: RDD[(String, Int)] = wordGroup.map(
      kv => (kv._1, kv._2.size)
    )
        
    Collect Trigger task execution
    val array: Array[(String, Int)] = wordToCount.collect()
    array.foreach(println)

        
    //TODO 3. Close resources
    sc.stop()
  }
Copy the code

Step 4 above, write more succinctly with pattern matching:

val wordToCount: RDD[(String, Int)] = wordGroup.map {	// Notice the outermost {}
  case (word, list) = > {
    (word, list.size)
  }
}
Copy the code

2. Classic version

Text explanation:

/** * Recommended WordCount -- reduce aggregation ** 1. Read the file and get a line of data * "hello world" * "hello Java "*" Hello Scala "* * 2. Participle * [" hello ", "hello", "hello", "world", "Java" and "scala"] * * 3. Convert * (hello, 1) (Hello, 1) (Hello, 1) * (world, 1) (Java, 1) (Scala, 1) * * 4. * hello --> Iterable((hello, 1), (hello, 1)) * world --> Iterable(world, 1) * Java --> Iterable(Java, 1) 1) * scala --> Iterable(scala, 1) * * 5. Aggregation computing * (hello, 1) (1) hello, (hello, 1) = = > (hello, 3) * (world, 1) * (Java, 1) * (scala, 1) * /
Copy the code

Illustration:

Code:

def main(args: Array[String]): Unit = {
    TODO 1. Establish a connection with the Spark framework
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
    val sc = new SparkContext(sparkConf)


    // TODO 2
    // 1. Read the file and obtain the data line by line
    val lines: RDD[String] = sc.textFile("datas/")
    // 2. 分词: split + flatten
    val words: RDD[String] = lines.flatMap(_.split(""))
    // 3. 转换,便于统计: hello => (hello, 1)
    val wordToOne: RDD[(String, Int)] = words.map(
      word => (word, 1))// 4. Group and aggregate the converted data
    // Grouping: (hello,1) -> Iterable((hello,1)... (hello,1))
    val wordGroup: RDD[(String, Iterable[(String, Int)])] = wordToOne.groupBy(
      t => t._1
    )
    // Iterable((hello,1),... (hello,1)) ==reduce==> (hello, 4)
    val wordToCount: RDD[(String, Int)] = wordGroup.map {
      case (word, list) => {
        val tuple: (String, Int) = list.reduce(
          (t1, t2) => {
            (t1._1, t2._2 + t1._2)
          }
        )
        tuple
      }
    }

    Collect Trigger task execution
    val array: Array[(String, Int)] = wordToCount.collect()
    array.foreach(println)

        
    //TODO 3. Close resources
    sc.stop()
  }
Copy the code

3. Spark simplified version

Tip: Just use reduceByKey (Step4: Group Aggregation) in the classic version instead, which is Spark’s unique API.

Code:

def main(args: Array[String]): Unit = {
    TODO 1. Establish a connection with the Spark framework
    val sparkConf = new SparkConf().setMaster("local").setAppName("WordCount")
    val sc = new SparkContext(sparkConf)

    // TODO 2
    val lines: RDD[String] = sc.textFile("datas")
    val words: RDD[String] = lines.flatMap(_.split(""))
    val wordToOne: RDD[(String, Int)] = words.map(
      word => (word, 1))/* Spark groups and aggregates reduceByKey() using a method: Aggregate values of the same key. The parameter passed in is func: (Int, Int) => Int is the aggregation logic of values */
    val wordToCount: RDD[(String, Int)] = wordToOne.reduceByKey((x, y) => x + y)

    val array: Array[(String, Int)] = wordToCount.collect()
    array.foreach(println)

    //TODO 3. Close resources
    sc.stop()
  }
Copy the code

The final output is:

(hello,4)
(world,2)
(spark,2)
Copy the code

Spark provides a variety of RDD operators. Here we use wordCount as an example to summarize other operators that can solve this problem:

  • grouping
// groupBy: "hello" -> Iterable("hello", "hello",...)
def wordcount1(sc : SparkContext) :Unit = {
    val rdd = sc.makeRDD(List("Hello Scala"."Hello Spark"))
    val words = rdd.flatMap(_.split(""))
    val group: RDD[(String.可迭代[String])] = words.groupBy(word=>word)
    val wordCount: RDD[(String.Int)] = group.mapValues(iter=>iter.size)
}

// groupByKey: "hello" -> Iterable(1, 1,...)
def wordcount2(sc : SparkContext) :Unit = {
    val rdd = sc.makeRDD(List("Hello Scala"."Hello Spark"))
    val words = rdd.flatMap(_.split(""))
    val wordOne = words.map((_,1))		// ("hello", 1)
    val group: RDD[(String.可迭代[Int])] = wordOne.groupByKey()
    val wordCount: RDD[(String.Int)] = group.mapValues(iter=>iter.size)
}
Copy the code
  • reduction
def wordcount3(sc : SparkContext) :Unit = {
    val rdd = sc.makeRDD(List("Hello Scala"."Hello Spark"))
    val words = rdd.flatMap(_.split(""))
    val wordOne = words.map((_,1))		// ("hello", 1)
     // reduceByKey
    val wordCount: RDD[(String.Int)] = wordOne.reduceByKey(_+_)
     // aggregateByKey
    val wordCount: RDD[(String.Int)] = wordOne.aggregateByKey(0) _) (_ + _, _ +// foldByKey
    val wordCount: RDD[(String.Int)] = wordOne.foldByKey(0+ _) (_)// combineByKey
    val wordCount: RDD[(String.Int)] = wordOne.combineByKey(
            v=>v,
            (x:Int, y) => x + y,
            (x:Int, y:Int) => x + y
        )
}
Copy the code
  • count
// countByKey
def wordcount7(sc : SparkContext) :Unit = {
    val rdd = sc.makeRDD(List("Hello Scala"."Hello Spark"))
    val words = rdd.flatMap(_.split(""))
    val wordOne = words.map((_,1))		// ("hello", 1)
    val wordCount: collection.Map[String.Long] = wordOne.countByKey()
}

// countByValue
def wordcount8(sc : SparkContext) :Unit = {
    val rdd = sc.makeRDD(List("Hello Scala"."Hello Spark"))
    val words = rdd.flatMap(_.split(""))
    val wordCount: collection.Map[String.Long] = words.countByValue()
}
Copy the code
  • Merge the map
def wordcount9(sc : SparkContext) :Unit = {
    val rdd = sc.makeRDD(List("Hello Scala"."Hello Spark"))
    val words = rdd.flatMap(_.split(""))

    val mapRdd: RDD[mutable.Map[String.Long]] = words.map(
        word => mutable.Map(word -> 1)		//Map("hello" -> 1)
    )

    // Merge two maps
    val value: mutable.Map[String.Long] = mapRdd.reduce(
        (map1, map2) => {
            map2.foreach {
                case (word, count) => {
                    val newCount = map1.getOrElse(word, 0L) + count
                    map1.update(word, newCount)		// Add or update KV
                }
            }
            map1
        }
    )
    println(value)  // Map(Hello -> 2, Scala -> 1, Spark -> 1)
}
Copy the code

4, summary

version Train of thought
Normal version Group, calculate the set size
The classic version Convert to (Word, 1) and use reduce reduction

The classic version is the core of WordCount, for example (“hello”, 10) and (“hello”, 20) can be reduced to (“hello”, 30), whereas the normal version has limitations.