“This is the 38th day of my participation in the November Gwen Challenge. See details of the event: The Last Gwen Challenge 2021”.

First, RDD conversion operator

RDD divides operators into Value type, double Value type and key-value type according to different data processing methods

1. Value type

1.1, the map

  1. Function signatures

    def map[U: ClassTag](f: T= >U) :RDD[U]
    Copy the code
  2. Function description

    The processed data is mapped and transformed one by one, where the conversion can be a type conversion or a value conversion.

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator -map
    
      val rdd = sc.makeRDD(List(1.2.3.4))
      / / 1, 2, 3, 4
      / / 2,4,6,8
    
      // Conversion function
      def mapFunction(num: Int) :Int = {
        num * 2
      }
    
      //val mapRDD: RDD[Int] = rdd.map(mapFunction)
      //val mapRDD: RDD[Int] = rdd.map((num:Int)=>{num*2})
      //val mapRDD: RDD[Int] = rdd.map((num:Int)=>num*2)
      //val mapRDD: RDD[Int] = rdd.map((num)=>num*2)
      //val mapRDD: RDD[Int] = rdd.map(num=>num*2)
      val mapRDD: RDD[Int] = rdd.map(_ * 2)
    
      mapRDD.collect().foreach(println)
    
      sc.stop()
    
    }
    Copy the code
    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // Operator-map partition
    
      // 1. RDD computes data within a partition one by one
      // The next data is executed only after all the logic of the previous data has been executed.
      // The execution of data within the partition is orderly.
      // 2. The calculation of data in different partitions is unordered.
      val rdd = sc.makeRDD(List(1.2.3.4), 2)
    
      val mapRDD = rdd.map(
        num => {
          println("> > > > > > > >" + num)
          num
        }
      )
      val mapRDD1 = mapRDD.map(
        num => {
          println("# # # # # #" + num)
          num
        }
      )
    
      mapRDD1.collect()
    
      sc.stop()
    
    }
    Copy the code
    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator -map
      val rdd = sc.makeRDD(List(1.2.3.4), 2)
      // [1,2], [3,4]
      rdd.saveAsTextFile("output")
      val mapRDD = rdd.map(_ * 2)
      // [2,4], [6,8]
      mapRDD.saveAsTextFile("output1")
    
      sc.stop()
    
    }
    Copy the code
  3. Minor function: Obtains the URL resource path of user requests from the server log data apache.log

    def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // operator -map val RDD = sc.textFile("datas/apache.log") // long string // short string val mapRDD: RDD[String] = rdd.map( line => { val datas = line.split(" ") datas(6) } ) mapRDD.collect().foreach(println) sc.stop() }Copy the code

1.2, mapPartitions

  1. Function signatures

    def mapPartitions[U: ClassTag](
        f: Iterator[T] = >Iterator[U],
        preservesPartitioning: Boolean = false) :RDD[U]
    Copy the code
  2. Function description

    The data to be processed is sent to the compute node in the unit of partition for processing. The processing here means that the data can be processed arbitrarily, even if it is filtered.

      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator -mappartitions
      val rdd = sc.makeRDD(List(1.2.3.4), 2)
    
      // mapPartitions: data can be converted on a partition basis
      // But the entire partition is loaded into memory for reference
      // If the processed data is not released, there is a reference to the object.
      // In the case of small memory and large amount of data, memory overflow is easy to occur.
      val mpRDD: RDD[Int] = rdd.mapPartitions(
        iter => {
          println("> > > > > > > > > >")
          iter.map(_ * 2)
        }
      )
      mpRDD.collect().foreach(println)
    
      sc.stop()
    
    }
    Copy the code
  3. Small function: Get the maximum value of each data partition

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator -mappartitions
      val rdd = sc.makeRDD(List(1.2.3.4), 2)
    
      // [1,2], [3,4]
      // [2], [4]
      val mpRDD = rdd.mapPartitions(
        iter => {
          List(iter.max).iterator
        }
      )
      mpRDD.collect().foreach(println)
      sc.stop()
    
    }
    Copy the code

Consider a question: What is the difference between Map and mapPartitions?

  • Data processing perspective

    A Map operator is a data execution within a partition, similar to a serial operation. MapPartitions operators perform batch processing on a partition basis.

  • Functional perspective

    The Map operator is used to transform and change data in data sources. But it doesn’t reduce or increase the data. MapPartitions operators need to pass an iterator and return an iterator that does not require the same number of elements, so you can add or subtract data

  • Performance perspective

    Map operators are similar to serial operations, so the performance is low, while mapPartitions operators are similar to batch processing, so the performance is high. However, mapPartitions operators occupy memory for a long time, which may lead to insufficient memory and memory overflow errors. Therefore, it is not recommended when the memory is limited. Use the MAP operation.

1.3, mapPartitionsWithIndex

  1. Function signatures

    def mapPartitionsWithIndex[U: ClassTag](
        f: (Int.Iterator[T]) = >Iterator[U],
        preservesPartitioning: Boolean = false) :RDD[U]
    Copy the code
  2. Function description

    The data to be processed is sent to the compute node in the unit of partition for processing. By processing, it means that you can perform any processing, even filtering data, and obtain the current partition index during processing.

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator -mapPartitionsWithIndex
      val rdd = sc.makeRDD(List(1.2.3.4), 2)
    
      val mpiRDD = rdd.mapPartitionsWithIndex(
        (index, iter) => {
          // 1, 2, 3, 4
          iter.map(
            num => {
              (index, num)
            }
          )
        }
      )
    
      mpiRDD.collect().foreach(println)
    
      sc.stop()
    
    }
    Copy the code
  3. Small function: get the data of the second data partition

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator -mapPartitionsWithIndex
      val rdd = sc.makeRDD(List(1.2.3.4), 2)
      // [1,2], [3,4]
      val mpiRDD = rdd.mapPartitionsWithIndex(
        (index, iter) => {
          if (index == 1) {
            iter
          } else {
            Nil.iterator
          }
        }
      )
    
      mpiRDD.collect().foreach(println)
    
      sc.stop()
    
    }
    Copy the code

1.4, flatMap

  1. Function signatures

    def flatMap[U: ClassTag](f: T= >TraversableOnce[U) :RDD[U]
    Copy the code
  2. Function description

    The data processed is flattened and then mapped, so the operator is also called flat mapping

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // The operator -flatmap
    
      val rdd: RDD[String] = sc.makeRDD(List(
        "Hello Scala"."Hello Spark"
      ))
    
      val flatRDD: RDD[String] = rdd.flatMap(
        s => {
          s.split("")
        }
      )
    
      flatRDD.collect().foreach(println)
    
      sc.stop()
    
    }
    Copy the code
  3. Flattening List(List(1,2),3,List(4,5)

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // The operator -flatmap
    
      val rdd = sc.makeRDD(List(List(1.2), 3.List(4.5)))
    
      // val flatRDD = rdd.flatMap(
      // data => {
      // data match {
      // case list: List[_] => list
      // case dat => List(dat)
      / /}
      / /}
      / /)
    
      val flatRDD = rdd.flatMap {
        case list: List[_] => list
        case dat => List(dat)
      }
    
      flatRDD.collect().foreach(println)
    
      sc.stop()
    
    }
    Copy the code

1.5, glom

  1. Function signatures

    def glom() :RDD[Array[T]]
    Copy the code
  2. Function description

    The data in the same partition is directly converted into the same type of memory array for processing, the partition remains unchanged

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator -glom
    
      val rdd: RDD[Int] = sc.makeRDD(List(1.2.3.4), 2)
    
      // List => Int
      // Int => Array
      val glomRDD: RDD[Array[Int]] = rdd.glom()
    
      glomRDD.collect().foreach(data => println(data.mkString(",")))
    
      sc.stop()
    
    }
    Copy the code
  3. Small function: calculate the sum of the maximum value of all partitions (take the maximum value within a partition and sum the maximum value between partitions)

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator -glom
    
      val rdd: RDD[Int] = sc.makeRDD(List(1.2.3.4), 2)
    
      // [1,2], [3,4]
      // [2], [4]
      / / [6]
      val glomRDD: RDD[Array[Int]] = rdd.glom()
    
      val maxRDD: RDD[Int] = glomRDD.map(
        array => {
          array.max
        }
      )
      println(maxRDD.collect().sum)
    
      sc.stop()
    
    }
    Copy the code

1.6, groupBy

  1. Function signatures

    def groupBy[K](f: T= >K) (implicit kt: ClassTag[K) :RDD[(K.可迭代[T]]Copy the code
  2. Function description

Data is grouped according to specified rules. Partitions remain unchanged by default, but data is shuffled and regrouped. In the extreme case, data may be grouped in the same partition, with data from one group in one partition, but this does not mean that there is only one group in one partition.

```scala def main(args: Array[String]): Unit = { val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator") val sc = new SparkContext(sparkConf) // operator -groupby val RDD: RDD[Int] = sc.makerdd (List(1, 2, 3, 4), 2) // groupBy Def groupFunction(num: Int) = {num % 2} val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction) groupRDD.collect().foreach(println) sc.stop() } ```Copy the code
  1. Small function: Groups the List(“Hello”, “hive”, “hbase”, “Hadoop”) by the first letter of the word.

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator -groupby
    
      val rdd = sc.makeRDD(List("Hello"."Spark"."Scala"."Hadoop"), 2)
    
      // Grouping and partitioning are not necessarily related
      val groupRDD = rdd.groupBy(_.charAt(0))
    
      groupRDD.collect().foreach(println)
    
      sc.stop()
    
    }
    Copy the code
  2. Small function: Obtains the access volume of each period from the server log data apache.log.

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator -groupby
      val rdd = sc.textFile("datas/apache.log")
    
      val timeRDD: RDD[(String.可迭代[(String.Int)])] = rdd.map(
        line => {
          val datas = line.split("")
          val time = datas(3)
          //time.substring(0, )
          val sdf = new SimpleDateFormat("dd/MM/yyyy:HH:mm:ss")
          val date: Date = sdf.parse(time)
          val sdf1 = new SimpleDateFormat("HH")
          val hour: String = sdf1.format(date)
          (hour, 1)
        }
      ).groupBy(_._1)
    
      timeRDD.map {
        case (hour, iter) => {
          (hour, iter.size)
        }
      }.collect.foreach(println)
    
      sc.stop()
    
    }
    Copy the code

1.7, the filter

  1. Function signatures

    def filter(f: T= >Boolean) :RDD[T]
    Copy the code
  2. Function description

    Data is filtered according to the specified rules. Data that meets the rules is retained and data that does not meet the rules is discarded. After data is filtered, partitions remain the same, but data in partitions may be unbalanced. In the production environment, data skew may occur.

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator -filter
      val rdd = sc.makeRDD(List(1.2.3.4))
    
      val filterRDD: RDD[Int] = rdd.filter(num => num % 2! =0)
    
      filterRDD.collect().foreach(println)
    
      sc.stop()
    
    }
    Copy the code
  3. Small function: Obtain the request path for May 17, 2015 from the server log data apache.log

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator -filter
      val rdd = sc.textFile("datas/apache.log")
    
      rdd.filter(
        line => {
          val datas = line.split("")
          val time = datas(3)
          time.startsWith("17/05/2015")
        }
      ).collect().foreach(println)
    
      sc.stop()
    
    }
    Copy the code

1.8, the sample

  1. Function signatures

    def sample(
        withReplacement: Boolean,
        fraction: Double,
        seed: Long = Utils.random.nextLong): RDD[T]
    Copy the code
  2. Function description

    Extract data from a data set according to the specified rules

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator -sample
      val rdd = sc.makeRDD(List(1.2.3.4.5.6.7.8.9.10))
    
      The sample operator needs to pass three arguments
      // 1. The first parameter indicates whether to return true (put back), false (discard) after extracting data.
      // 2.
      // The probability that each piece of data in the data source will be extracted, the concept of reference value
      // The number of times each piece of data in the data source can be extracted
      // 3. The third parameter indicates the seed of the random algorithm when extracting data
      // If the third parameter is not passed, the current system time is used
      // println(rdd.sample(
      // false,
      / / 0.4
      / / / / 1
      // ).collect().mkString(","))
    
      println(rdd.sample(
        true.2
        / / 1
      ).collect().mkString(","))
    
      sc.stop()
    
    }
    Copy the code
  3. Consider a question: what is the use of a lottery?

1.9, distinct

  1. Function signatures

    def distinct() (implicit ord: Ordering[T] = null) :RDD[T]
    def distinct(numPartitions: Int) (implicit ord: Ordering[T] = null) :RDD[T]
    Copy the code
  2. Function description

    Deduplication of duplicate data in a dataset

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // Operator - distinct
      val rdd = sc.makeRDD(List(1.2.3.4.1.2.3.4))
    
      val rdd1: RDD[Int] = rdd.distinct()
    
      rdd1.collect().foreach(println)
    
      sc.stop()
    
    }
    Copy the code
  3. Consider a question: how do you implement data de-duplication without using this operator?

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // Operator - distinct
      val rdd = sc.makeRDD(List(1.2.3.4.1.2.3.4))
    
      rdd.map(x => (x, null))
        .reduceByKey((x, _) => x, 2)
        .map(_._1)
        .collect()
        .foreach(println)
    
      sc.stop()
    
    }
    Copy the code

1.10, coalesce

  1. Function signatures
def coalesce(numPartitions: Int, shuffle: Boolean = false,
    partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
    (implicit ord: Ordering[T] = null)
    : RDD[T]
Copy the code
  1. Function description

    Reduce partitions based on data volume to improve the execution efficiency of small data sets after filtering large data sets. If too many small tasks exist in spark, you can use the coalesce method to shrink and merge partitions to reduce the number of partitions and reduce task scheduling costs

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // Operator - coalesce
      val rdd = sc.makeRDD(List(1.2.3.4.5.6), 3)
    
      // By default, the coalesce method does not scramble and recombine partition data
      // In this case, scaling down partitions may lead to data imbalance and skew
      Shuffle to balance data
      //val newRDD: RDD[Int] = rdd.coalesce(2)
      val newRDD: RDD[Int] = rdd.coalesce(2.true)
    
      newRDD.saveAsTextFile("output")
    
      sc.stop()
    
    }
    Copy the code
  2. Consider a question: What if I want to expand the partition?

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // Operator - coalesce
      val rdd = sc.makeRDD(List(1.2.3.4.5.6), 2)
    
      // The coalesce operator can expand partitions. However, if the shuffle operation is not performed, the coalesce operator is meaningless.
      // To expand the partition, use shuffle
      // Spark provides a simplified operation
      // Partition reduction: coalesce. Shuffle is used for data balancing
      // Repartition: the underlying code calls coalesce, which must use shuffle
      //val newRDD: RDD[Int] = rdd.coalesce(3, true)
      val newRDD: RDD[Int] = rdd.repartition(3)
    
      newRDD.saveAsTextFile("output")
    
      sc.stop()
    
    }
    Copy the code

1.11, repartition

  1. Function signatures

    def repartition(numPartitions: Int) (implicit ord: Ordering[T] = null) :RDD[T]
    Copy the code
  2. Function description

    The coalesce operation is performed internally. The default value of shuffle is true. The repartition operation can be completed either by converting an RDD with a large number of partitions to an RDD with a small number of partitions or by converting an RDD with a large number of partitions to an RDD with a large number of partitions.

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // Operator -repartition
      val rdd = sc.makeRDD(List(1.2.3.4.5.6), 2)
    
      // The coalesce operator can expand partitions. However, if the shuffle operation is not performed, the coalesce operator is meaningless.
      // To expand the partition, use shuffle
      // Spark provides a simplified operation
      // Partition reduction: coalesce. Shuffle is used for data balancing
      // Repartition: the underlying code calls coalesce, which must use shuffle
      //val newRDD: RDD[Int] = rdd.coalesce(3, true)
      val newRDD: RDD[Int] = rdd.repartition(3)
    
      newRDD.saveAsTextFile("output")
    
      sc.stop()
    
    }
    Copy the code
  3. Consider the difference between coalesce and repartition.

1.12, sortBy

  1. Function signatures

    def sortBy[K](
        f: (T) = >K,
        ascending: Boolean = true,
        numPartitions: Int = this.partitions.length)
        (implicit ord: Ordering[K], ctag: ClassTag[K) :RDD[T]
    Copy the code
  2. Function description

    This operation is used to sort data. Before sorting, the data can be processed by the F function, and then sorted by the result of the F function processing, which is in ascending order by default. The number of partitions in the new RDD is the same as the number of partitions in the original RDD. There’s a shuffle

    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator -sortby
      val rdd = sc.makeRDD(List(6.2.4.5.3.1), 2)
    
      val newRDD: RDD[Int] = rdd.sortBy(num => num)
    
      newRDD.saveAsTextFile("output")
    
      sc.stop()
    
    }
    Copy the code
    def main(args: Array[String) :Unit = {
    
      val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Operator")
      val sc = new SparkContext(sparkConf)
    
      // operator -sortby
      val rdd = sc.makeRDD(List(("1".1), ("11".2), ("2".3)), 2)
    
      The sortBy method sorts the data in the data source according to the specified rules. The default is ascending. The second argument can change the sorting method
      // sortBy does not change partitions by default. However, the shuffle operation exists in the process
      val newRDD = rdd.sortBy(t => t._1.toInt, false)
    
      newRDD.collect().foreach(println)
    
      sc.stop()
    
    }
    Copy the code

2. Double Value type

3. Key-value type

Two, friendship links

Big data Spark learning Journey part 2

Part one of Spark’s big data Learning journey