Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.

This article describes the conversion operator of Spark computing engine in big data

Recommended reading time: 15-20 minutes


Value type operator

The map of operator

Function signatures

def map[U: ClassTag](f: T => U): RDD[U]
Copy the code

Function description

The processed data is mapped and transformed one by one, where the conversion can be a type conversion or a value conversion.

Demo code

val conf: SparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)
val dataRDD: RDD[Int] = sc.makeRDD(List(1.2.3.4))
val dataRDD1: RDD[Int] = dataRDD.map(
    num => {
        num * 2
    }
)
val dataRDD2: RDD[String] = dataRDD1.map(
    num => {
        "" + num
    }
)

println(dataRDD1.collect().mkString("\t"))
println(dataRDD2.collect().mkString("\t"))

Copy the code

The first map is worth converting, with each value *2 in the original list

The second map is a type conversion, converting each worthwhile type in the list from int to String

The results

2, 4, 6, 8, 2, 4, 6, 8Copy the code

MapPartitions operator

Function signatures

def mapPartitions[U: ClassTag](
    f: Iterator[T] => Iterator[U],
    preservesPartitioning: Boolean = false): RDD[U]
Copy the code

Function description

MapPartitions process data by partition. Map processes data one at a time. This is recommended for large memory space

MapPartitions are processed one by one while mapPartitions are processed on a partitioned basis

Demo code

listRD.mapPartitions(datas= >{
 datas.map(data= >data*2)})Copy the code

Advantages: mapPartitions operators are more efficient than MAP, reducing the number of interactions sent to the actuator (partition interactions, not every data interaction)

Disadvantages: Memory overflow (large amounts of data pouring into a partition)

MapPartitionsWithIndex operator

Function signatures

def mapPartitionsWithIndex[U: ClassTag](
  f: (Int, Iterator[T]) => Iterator[U],
  preservesPartitioning: Boolean = false): RDD[U]
Copy the code

Function description

The data to be processed with the partition number is sent to the compute node for processing by partition. Processing here means that any processing can be performed, even filtering data, and the current partition index can be obtained during processing.

Demo code

val res: RDD[(Int, String)] = dataRDD.mapPartitionsWithIndex {
      case (num, datas) => {
        datas.map((_, "Partition No. :" + num))
      }
    }
println(res.collect().mkString("\n"))

val dataRDD1 = dataRDD.mapPartitionsWithIndex(
    (index, datas) => {
         datas.map(index, _)
    }
)
Copy the code

The results

(1, Partition number: 0) (2, partition number: 0) (3, partition number: 1) (4, partition number: 1)Copy the code

FlatMap operator

Function signatures

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
Copy the code

Function description

Map: Get a new element (several elements or several elements)

Flatmap: Obtain one or more new elements (more than the original elements). Flatmap flattens the processed data before mapping, so the operator is also called flat mapping

Demo code

val lineArray = Array("hello you"."hello me"."hello world")
    val lines = sc.parallelize(lineArray, 1)
    val res1: RDD[Array[String]] = lines.map(_.split(""))
    val res2: RDD[String] = lines.flatMap(_.split(""))

    println("map:")
    res1.foreach(
      line=> println(line.mkString("\t"))
    )
    println()
    println("flatMap:")
    res2.foreach(
      line=> println(line.mkString("\t")))Copy the code

The results

map:
hello	you
hello	me
hello	world

flatMap:
h	e	l	l	o
y	o	u
h	e	l	l	o
m	e
h	e	l	l	o
w	o	r	l	d
Copy the code

Glom operator

Function signatures

def glom(): RDD[Array[T]]
Copy the code

Function description

The data in the same partition is directly converted into the same type of memory array for processing. Partitions remain unchanged and are applicable to data statistics in the unit of partitions

Demo code

val rdd: RDD[Int] = sc.makeRDD(List(1.5.6.9.8.5.6.2.4.65.2),4)
val arrayrdd: RDD[Array[Int]] = rdd.glom()
arrayrdd.collect().foreach(array=>{
  println(array.max)
})
}
Copy the code

GroupBy operator

Function signatures

def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
Copy the code

Function description

Group by the return value of the passed function. Place the corresponding value of the same key into an iterator.

Demo code

val rdd: RDD[Int] = sc.makeRDD(List(1.5.6.9.8.5.6.2.4.65.2))
val grouprdd: RDD[(Int, Iterable[Int])] = rdd.groupBy(i => i%2)
grouprdd.foreach(println)
Copy the code

The filter operator

Function signatures

def filter(f: T => Boolean): RDD[T]
Copy the code

Function description

Data is filtered according to the specified rules. Data that meets the rules is retained and data that does not meet the rules is discarded. After data is filtered, partitions remain the same, but data in partitions may be unbalanced. In the production environment, data skew may occur.

Demo code

val dataRDD = sparkContext.makeRDD(List(
    1.2.3.4
),1)
val dataRDD1 = dataRDD.filter(_%2= =0)
Copy the code

The sample of operator

Function signatures

def sample(
  withReplacement: Boolean,
  fraction: Double,
  seed: Long = Utils.random.nextLong): RDD[T]
Copy the code

Function description

Extract data from a data set according to the specified rules

Code demo

val dataRDD = sparkContext.makeRDD(List(
    1.2.3.4
),1)

val dataRDD1 = dataRDD.sample(false.0.5)
val dataRDD2 = dataRDD.sample(true.2)
Copy the code

Extension:

Extract data without putting it back (Bernoulli algorithm)

Bernoulli algorithm: also known as 0, 1 distribution. Like flipping a coin, either heads or tails.

Specific implementation: according to the seed and random algorithm to calculate a number and the second parameter setting probability comparison, less than the second parameter to, greater than not

First parameter: whether to return extracted data. False: not to return extracted data

The second parameter: the probability of extraction, which ranges from [0,1],0: none; 1.

Third parameter: random number seed

Extract data and put it back (Poisson algorithm)

The first parameter: whether the extracted data should be put back, true: put back; False: Not put back

Second parameter: the probability of repeating data, with a range greater than or equal to 0. Indicates how many times each element is expected to be extracted

Third parameter: random number seed

Distinct operator

Function signatures

def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
Copy the code

Function description

De-duplication of data is used directly in the RDD where de-duplication is required

val lineArray = Array("hello you"."hello me"."hello world"."hello me")
val lines = sc.parallelize(lineArray, 1)
val value: RDD[String] = lines.distinct()
Copy the code

Coalesce and repartition operators

Function signatures

def coalesce(numPartitions: Int, shuffle: Boolean = false,
           partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
          (implicit ord: Ordering[T] = null)
  : RDD[T]
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
Copy the code

Function description

Repartition actually invokes coalesce, which shuffles by default. The coalesce shuffle process is optional

Development:

1) If N<M.

Generally, N partitions have uneven data distribution. Use the HashPartitioner function to repartition data into M partitions. In this case, shuffle needs to be set to True (this function is implemented by Repartition but cannot be implemented by coalesce).

2) If N is greater than M and N and M are similar

In this case, set shuff to false (coalesce implementation). If M is greater than N, coalesce is invalid and shuffle is not performed. There is a narrow dependency between parent RDD and child RDD, and the partiton cannot be increased. If shuffle is false, the number of partitions in the RDD will remain the same if the parameter passed is greater than the number of existing partitions. In other words, the number of partitions in the RDD cannot be increased without shuffle

3) If N is greater than M and the two differ greatly

If the number of executors is less than the number of partitions to be generated, coalesce is efficient. Conversely, using coalesce will result in (executor number – Partiton number to be generated) empty excutor runs, reducing efficiency. If M is 1, shuffle can be set to true to ensure better parallelism of operations performed before coalesce.

Sortby operator

Function signatures

def sortBy[K](
  f: (T) => K,
  ascending: Boolean = true,
  numPartitions: Int = this.partitions.length)
  (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
Copy the code

Function description

This operation is used to sort data. Before sorting, the data can be processed by the F function, and then sorted by the result of the F function processing, which is in ascending order by default. The number of partitions in the new RDD is the same as the number of partitions in the original RDD. You can have a second argument if you want descending order, set it to false

Demo code

val dataRDD = sparkContext.makeRDD(List(
    1.2.3.4.1.2
),2)
 
val dataRDD1 = dataRDD.sortBy(num= >num, false.4)
Copy the code

PartitionBy operator

Function signatures

def partitionBy(partitioner: Partitioner): RDD[(K, V)]
Copy the code

Function description

The Partitioner repartitions the data to the specified Partitioner. The default Spark partitioner is the HashPartitioner

The HashPartitioner, which resizes the current key, is the default partition

The RangePartitioner, which requires data to be comparably sized and sorted, has low applicability,

Custom divider

Demo code

val listedd: RDD[(String.String)] = sc.makeRDD(
    List(
      ("cba"."1"), ("nba"."2"), ("nba"."3"),
      ("cba"."4"), ("cba"."5"), ("wnba"."6")),1
  )
  val rdd1: RDD[(String.String)] = listedd.partitionBy(new MyPartition(2))
  val rdd2: RDD[(Int, (String.String))] = rdd1.mapPartitionsWithIndex(
    (index, datas) = >
      datas.map(
        data= > (index, data)
      )
  )
  println(rdd2.collect().foreach(println))
  sc.stop()
}
/ / inherit the Partitioner
class MyPartition(num: Int) extends Partitioner {
  // Get the number of partitions
  override def numPartitions: Int = num
// Determine the partition in which the data will be processed based on the data key
// The method returns a partition number (index)
  override def getPartition(key: Any): Int = {
    key match {
      case "nba"= >0
      case _= > 1}}}Copy the code

Note: If the repartitioned divider is the same as the current RDD divider, no processing is done and the partition will not be repartitioned


This is the end of this article, for the knowledge of this article, there will be a follow-up series, interested partners please continue to pay attention to!