“This is the 39th day of my participation in the November Gwen Challenge. See details: The Last Gwen Challenge 2021”.

First, RDD conversion operator

1. Double Value type

1.1, intersection computes

  1. Function signatures

    def intersection(other: RDD[T) :RDD[T]
    Copy the code
  2. Function description

    The intersection of the source RDD and the parameter RDD returns a new RDD

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator - double Value type - intersection
      // Intersection requires that the data types of the two data sources be the same
    
      val rdd1 = sc.makeRDD(List(1.2.3.4))
      val rdd2 = sc.makeRDD(List(3.4.5.6))
    
      // intersection: [3,4]
      val rdd3: RDD[Int] = rdd1.intersection(rdd2)
      println(rdd3.collect().mkString(","))
    
      sc.stop()
    
    }
    Copy the code
  3. Consider a question: What if two RDD data types are inconsistent?

1.2, the union

  1. Function signatures

    def union(other: RDD[T) :RDD[T]
    Copy the code
  2. Function description

    The union of the source RDD and the parameter RDD returns a new RDD

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // Operator - double Value - union
      // Merge requires that the data types of the two data sources be the same
      // The type of the zipper operation can be inconsistent between the two data sources
    
      val rdd1 = sc.makeRDD(List(1.2.3.4))
      val rdd2 = sc.makeRDD(List(3.4.5.6))
    
      // merge: [1, 2, 3, 4, 4, 5, 6]
      val rdd4: RDD[Int] = rdd1.union(rdd2)
      println(rdd4.collect().mkString(","))
    
      sc.stop()
    
    }
    Copy the code
  3. Consider a question: What if two RDD data types are inconsistent?

1.3, subtract

  1. Function signatures

    def subtract(other: RDD[T) :RDD[T]
    Copy the code
  2. Function description

    Take one RDD element as the main element, remove the duplicate elements in the two RDD, and keep the other elements. O difference set

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // Operator - Double Value type - subtract
      // Difference sets require that the data types of the two data sources be the same
    
      val rdd1 = sc.makeRDD(List(1.2.3.4))
      val rdd2 = sc.makeRDD(List(3.4.5.6))
    
      // difference set: [1,2]
      val rdd5: RDD[Int] = rdd1.subtract(rdd2)
      println(rdd5.collect().mkString(","))
    
      sc.stop()
    
    }
    Copy the code
  3. Consider a question: What if two RDD data types are inconsistent?

1.4, zip,

  1. Function signatures

    def zip[U: ClassTag](other: RDD[U) :RDD[(T.U)]
    Copy the code
  2. Function description

    Combine elements in two RDD as key-value pairs. Where, Key in the key-value pair is the element in the first RDD, and Value is the element in the same position in the second RDD.

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator - double Value - zip
      // Intersection, union, and difference sets require that the data types of the two data sources be the same
      // The type of the zipper operation can be inconsistent between the two data sources
    
      val rdd1 = sc.makeRDD(List(1.2.3.4))
      val rdd2 = sc.makeRDD(List(3.4.5.6))
      val rdd7 = sc.makeRDD(List("3"."4"."5"."6"))
    
      // Zip: [1-3, 2-4, 3-5, 4-6]
      val rdd6: RDD[(Int.Int)] = rdd1.zip(rdd2)
      println(rdd6.collect().mkString(","))
    
      val rdd8 = rdd1.zip(rdd7)
      println(rdd8.collect().mkString(","))
    
      sc.stop()
    
    }
    Copy the code
  3. Consider a question: What if two RDD data types are inconsistent?

    Inconsistency is ok!

  4. Consider a question: What if two RDD data partitions are inconsistent?

    The two data sources require the same number of partitions. If the number of partitions is inconsistent, an error is reported.

  5. Consider a question: What if the two RDD partitions have inconsistent amounts of data?

    Two data sources require the same amount of data in the partition. If the amount of data in the partition is inconsistent, an error is reported!!

2. Key-value type

2.1, partitionBy

  1. Function signatures

    def partitionBy(partitioner: Partitioner) :RDD[(K.V)]
    Copy the code
  2. Function description

    Repartitions the data according to the specified Partitioner. The default Spark partitioner is the HashPartitioner

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // Operator - (key-value type) -partitionby
      val rdd = sc.makeRDD(List(1.2.3.4), 2)
    
      val mapRDD: RDD[(Int.Int)] = rdd.map((_, 1))
      // RDD => PairRDDFunctions
      // Implicit conversion (secondary compilation)
    
      // partitionBy Repartitions data according to the specified partitioning rules
      val newRDD = mapRDD.partitionBy(new HashPartitioner(2))
    
      newRDD.saveAsTextFile("output")
    
      sc.stop()
    
    }
    Copy the code
  3. Consider a question: What if the repartitioned divider is the same as the current RDD divider?

  4. Consider a question: Are there any other partitions for Spark?

  5. Consider a question: What if you want to partition data in your own way?

2.2, reduceByKey

  1. Function signatures

    def reduceByKey(func: (V.V) = >V) :RDD[(K.V)]
    
    def reduceByKey(func: (V.V) = >V, numPartitions: Int) :RDD[(K.V)]
    Copy the code
  2. Function description

    You can aggregate data against values based on the same Key

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // The operator - (key-value type) -reduceByKey
      val rdd = sc.makeRDD(List(("a".1), ("a".2), ("a".3), ("b".4)))// reduceByKey: Use the same key data to aggregate value data
      // In Scala, the general aggregation operation is pin-two aggregation. Spark is developed based on Scala, so its aggregation is also pin-two aggregation
      // 【 1,2,3 】
      // [3,3]
      / / [6]
      // In reduceByKey, if there is only one key data, it will not participate in the calculation.
      val reduceRDD: RDD[(String.Int)] = rdd.reduceByKey((x: Int, y: Int) => {
        println(s"x = ${x}, y = ${y}")
        x + y
      })
    
      reduceRDD.collect().foreach(println)
    
      sc.stop()
    
    }
    Copy the code
  3. Small feature: WordCount

2.3, groupByKey

  1. Function signatures

    def groupByKey() :RDD[(K.可迭代[V]]def groupByKey(numPartitions: Int) :RDD[(K.可迭代[V]]def groupByKey(partitioner: Partitioner) :RDD[(K.可迭代[V]]Copy the code
  2. Function description

    Group data source values by key

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // Operator - (key-value) -groupBykey
      val rdd = sc.makeRDD(List(("a".1), ("a".2), ("a".3), ("b".4)))// groupByKey: Groups data in the data source with the same key into a group to form a dual tuple
      // The first element in the tuple is the key,
      // The second element in the tuple is the set of values with the same key
      val groupRDD: RDD[(String.可迭代[Int])] = rdd.groupByKey()
    
      groupRDD.collect().foreach(println)
    
      val groupRDD1: RDD[(String.可迭代[(String.Int)])] = rdd.groupBy(_._1)
      groupRDD1.collect().foreach(println)
    
      sc.stop()
    
    }
    Copy the code
  3. Consider a question: What is the difference between reduceByKey and groupByKey?

    From the Shuffle’s perspective: ReduceByKey and groupByKey both contain the operation of shuffle, but reduceByKey can pre-combine the data of the same key in partitions before shuffle, which will reduce the amount of data dropped in disks. However, groupByKey is only used for grouping, and there is no problem of reducing the amount of data. Therefore, reduceByKey has high performance.

    From the perspective of functions: reduceByKey actually contains the functions of grouping and aggregation. GroupByKey can only be grouped but cannot be aggregated. Therefore, it is recommended to use reduceByKey in the case of group aggregation. So you can only use groupByKey

2.4, aggregateByKey

  1. Function signatures

    def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U.V) = >U,
        combOp: (U.U) = >U) :RDD[(K.U)]
    Copy the code
  2. Function description

    Data is computed within and between partitions according to different rules

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // Operator - (key-value type) -aggregateByKey
    
      val rdd = sc.makeRDD(List(("a".1), ("a".2), ("a".3), ("a".4)),2)
      // (a, [1,2]), (a, [3,4])
      // (a, 2), (a, 4)
      // (a, 6)
    
      // aggregateByKey exists as a function currified with two parameter lists
      // For the first argument list, you need to pass a single argument, representing the initial value
      // It is used to calculate the value of the partition when the first key is encountered
      // The second argument list requires passing two arguments
      // The first parameter represents the calculation rule within the partition
      // The second parameter indicates the calculation rule between partitions
    
      // math.min(x, y)
      // math.max(x, y)
      rdd.aggregateByKey(0)(
        (x, y) => math.max(x, y),
        (x, y) => x + y
      ).collect.foreach(println)
    
      sc.stop()
    
    }
    Copy the code
    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // Operator - (key-value type) -aggregateByKey
    
      val rdd = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
        ("b".4), ("b".5), ("a".6)),2)
    
      // aggregateByKey exists as a function currified with two parameter lists
      // For the first argument list, you need to pass a single argument, representing the initial value
      // It is used to calculate the value of the partition when the first key is encountered
      // The second argument list requires passing two arguments
      // The first parameter represents the calculation rule within the partition
      // The second parameter indicates the calculation rule between partitions
    
      // math.min(x, y)
      // math.max(x, y)
      rdd.aggregateByKey(5)(
        (x, y) => math.max(x, y),
        (x, y) => x + y
      ).collect.foreach(println)
    
      rdd.aggregateByKey(0)(
        (x, y) => x + y,
        (x, y) => x + y
      ).collect.foreach(println)
    
      rdd.aggregateByKey(0)(_ + _, _ + _).collect.foreach(println)
    
      sc.stop()
    
    }
    Copy the code
  3. Consider a question: What if the calculation rules within partitions are the same as those between partitions? (WordCount)

    If the calculation rules within and between partitions are the same during aggregate calculation, Spark provides foldByKey, a simplified method

2.5, foldByKey

  1. Function signatures
def foldByKey(zeroValue: V)(func: (V.V) = >V) :RDD[(K.V)]
Copy the code
  1. Function description

    When the calculation rules within and between partitions are the same, the aggregateByKey can be simplified to foldByKey

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // Operator - (key-value type) -foldByKey
    
      val rdd = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
        ("b".4), ("b".5), ("a".6)),2)
    
      //rdd.aggregateByKey(0)(_+_, _+_).collect.foreach(println)
    
      // If the calculation rules within and between partitions are the same during aggregate calculation, Spark provides a simplified method
      rdd.foldByKey(0)(_ + _).collect.foreach(println)
    
      sc.stop()
    
    }
    Copy the code

2.6, combineByKey

  1. Function signatures
def combineByKey[C](
    createCombiner: V= >C,
    mergeValue: (C.V) = >C,
    mergeCombiners: (C.C) = >C) :RDD[(K.C)]
Copy the code
  1. Function description

    The most common aggregation function that aggregates key-value RDD. Similar to aggregate(), combineByKey() allows the user to return a value of a type that is inconsistent with the input.

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // Operator - (key-value) -combineBykey
    
      val rdd = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
        ("b".4), ("b".5), ("a".6)),2)
    
      // combineByKey: The method takes three parameters
      // The first parameter indicates that the first data of the same key is transformed to achieve the operation
      // The second parameter indicates the calculation rule within the partition
      // The third parameter indicates the calculation rule between partitions
      val newRDD: RDD[(String, (Int.Int))] = rdd.combineByKey(
        v => (v, 1),
        (t: (Int.Int), v) => {
          (t._1 + v, t._2 + 1)
        },
        (t1: (Int.Int), t2: (Int.Int)) => {
          (t1._1 + t2._1, t1._2 + t2._2)
        }
      )
    
      val resultRDD: RDD[(String.Int)] = newRDD.mapValues {
        case (num, cnt) => {
          num / cnt
        }
      }
      resultRDD.collect().foreach(println)
    
      sc.stop()
    
    }
    Copy the code
  2. Consider a question: What is the difference between reduceByKey, foldByKey, aggregateByKey and combineByKey?

    • ReduceByKey: No calculation is performed on the first data of the same key, and the calculation rules are the same within and between partitions

    • FoldByKey: The first data and the initial value of the same key are calculated within a partition. The calculation rules are the same within and between partitions

    • AggregateByKey: The first data and initial value of the same key are calculated within a zone. The calculation rules within and between zones may be different

    • CombineByKey: When the data structure does not meet the requirements during calculation, the first data can be converted to the structure. The calculation rules are different between and within partitions.

2.7, sortByKey

  1. Function signatures

    def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
    : RDD[(K.V)]
    Copy the code
  2. Function description

    Called on a (K,V) RDD, K must implement the Ordered interface, which returns a key Ordered

2.8, the join

  1. Function signatures

    def join[W](other: RDD[(K.W)]) :RDD[(K, (V.W)))Copy the code
  2. Function description

    Called on RDD of type (K,V) and (K,W), returns a RDD of (K,(V,W) where all elements corresponding to the same key are joined together

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator -join
    
      val rdd1 = sc.makeRDD(List(("a".1), ("b".2), ("c".3)))val rdd2 = sc.makeRDD(List(("a".5), ("b".6), ("c".4)))// join: data from two different data sources with the same key value will be joined together to form a tuple
      // If the two data sources do not match the key, the data will not appear in the result
      // If there are multiple identical keys in two data sources, they will be matched sequentially. Cartesian product may occur, and the amount of data will increase geometrically, resulting in performance degradation.
      val joinRDD: RDD[(String, (Int.Int))] = rdd1.join(rdd2)
    
      joinRDD.collect().foreach(println)
    
      sc.stop()
    
    }
    Copy the code

2.9, leftOuterJoin or rightOuterJoin

  1. Function signatures

    def leftOuterJoin[W](other: RDD[(K.W)]) :RDD[(K, (V.Option[W"))"Copy the code
  2. Function description

    Left outer join, right outer join similar to SQL statement

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator - leftOuterJoin or rightOuterJoin
    
      val rdd1 = sc.makeRDD(List(("a".1), ("b".2), ("c".3)))val rdd2 = sc.makeRDD(List(("a".4), ("b".5) //, ("c", 6)
      ))
      val leftJoinRDD = rdd1.leftOuterJoin(rdd2)
      val rightJoinRDD = rdd1.rightOuterJoin(rdd2)
    
      leftJoinRDD.collect().foreach(println)
      // rightJoinRDD.collect().foreach(println)
    
      sc.stop()
    
    }
    Copy the code

2.10, cogroup

  1. Function signatures

    def cogroup[W](other: RDD[(K.W)]) :RDD[(K, (可迭代[V].可迭代[W"))"Copy the code
  2. Function description

    Called on RDD of type (K,V) and (K,W), returns an RDD of type (K,(Iterable,Iterable)

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator -cogroup
    
      val rdd1 = sc.makeRDD(List(("a".1), ("b".2) //, ("c", 3)
      ))
    
      val rdd2 = sc.makeRDD(List(("a".4), ("b".5), ("c".6), ("c".7)))// cogroup: connect + group
      val cgRDD: RDD[(String, (可迭代[Int].可迭代[Int]))] = rdd1.cogroup(rdd2)
    
      cgRDD.collect().foreach(println)
    
      sc.stop()
    
    }
    Copy the code

Two, friendship links

Part 3 of the Big Data Spark Learning Journey

Big data Spark learning Journey part 2

Part one of Spark’s big data Learning journey