This is the fifth day of my participation in Gwen Challenge

introduce

  • RDD operators can be classified into two kinds of transformations or actions in terms of data operations.

    • Conversion operator: To convert one RDD to another RDD is just a function overlay and is not actually performed. (Decorator design mode)
    • Action operator: The action operator actually triggers SparkContext to submit the Job.

This article describes the conversion operator

1. map

The processed data is mapped and transformed one by one, where the conversion can be a type conversion or a value conversion

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)
sc.makeRDD(List(1.2.3.4.5), 2).map(("num",_)).collect().foreach(println(_))
sc.stop()
Copy the code

2. mapPartitions

The data to be processed is sent to the compute node in the unit of partition for processing. The processing here means that the data can be processed arbitrarily, even if it is filtered

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)

sc.makeRDD(List(1.2.3.4.5), 2).mapPartitions(iter => {
    println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --"+iter.mkString(""))
    iter
}).collect()
sc.stop()
Copy the code

3. mapPartitionsWithIndex

The data to be processed is sent to the compute node in the unit of partition for processing. By processing, it means that you can perform any processing, even filtering data, and obtain the current partition index during processing

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)

sc.makeRDD(List(1.2.3.4.5), 2).mapPartitionsWithIndex((index, iter) => {
    println(index + "-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --" + iter.mkString(""))
    iter
}).collect()
sc.stop()
Copy the code

4. flatMap

The data processed is flattened and then mapped, so the operator is also called flat mapping

// Use examples
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val context = new SparkContext(sparkConf)
val lines = context.textFile("data/word.txt")
lines.flatMap(_.split("")).map((_, 1)).reduceByKey(_ + _).foreach(println(_))
sc.stop()
Copy the code

5. glom

The data in the same partition is directly converted into the same type of memory array for processing, the partition remains unchanged

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val glom: RDD[Array[Int]] = sc.makeRDD(List(1.2.3.4.5), 2).glom()
val map = glom.map(arr => {
    arr.sum
})
map.collect().foreach(println)
sc.stop()
Copy the code

6. groupBy

Data is grouped according to specified rules. Partitions remain unchanged by default, but data is shuffled and regrouped. In extreme cases, data may be grouped in the same partition

The data of one group is in one partition, but it is not said that there is only one group in one partition

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
    ("b".4), ("b".5), ("a".6)),2)
value.groupBy(_._1).collect().foreach(println)
sc.stop()
Copy the code

7. filter

Data is filtered according to the specified rules. Data that meets the rules is retained and data that does not meet the rules is discarded.

After data is filtered, partitions remain the same, but data in partitions may be unbalanced, which may occur in the production environmentData skew.

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
    ("b".4), ("b".5), ("a".6)),2)
value.filter(_._2 > 3).collect().foreach(println)
sc.stop()
Copy the code

8. sample

Extract data from a data set according to the specified rules

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
    ("b".4), ("b".5), ("a".6)),2)
value.sample(withReplacement = false.0.5).collect().foreach(println)
value.sample(withReplacement = true.0.7).collect().foreach(println)
sc.stop()
Copy the code

9. distinct

Deduplication of duplicate data in a dataset

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3), ("b".3), ("b".3),
    ("b".4), ("b".5), ("a".6)),2)
value.distinct().collect().foreach(println)
sc.stop()
Copy the code

10. coalesce

Partition reduction based on data volume is used to improve the execution efficiency of small data sets after filtering large data sets

If spark has too many small tasks, you can use the coalesce method to reduce the number of coalesce partitions and reduce the task scheduling cost

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3), ("b".3), ("b".3),
    ("b".4), ("b".5), ("a".6)),4)
// You can only reduce partitions by default. If shuffle is enabled, you can increase partitions
value.coalesce(2).saveAsTextFile("out1")
sc.stop()
Copy the code

11. repartition

The coalesce operation is performed internally. The default value of shuffle is true. The repartition operation can be completed either by converting an RDD with a large number of partitions to an RDD with a small number of partitions or by converting an RDD with a large number of partitions to an RDD with a large number of partitions.

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3), ("b".3), ("b".3),
    ("b".4), ("b".5), ("a".6)),4)
value.repartition(6).saveAsTextFile("out3")
sc.stop()
Copy the code

12. sortBy

This operation is used to sort data. Before sorting, the data can be processed by the F function, and then sorted by the result of the F function processing, which is in ascending order by default. The number of partitions in the new RDD is the same as the number of partitions in the original RDD. There’s a shuffle

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(
    1.3.5.6.7.8.32.23.4.5.62.12.4
), 4)
//ascending = false
value.sortBy(i => i, numPartitions = 1, ascending = false).saveAsTextFile("out3")
sc.stop()
Copy the code

13. intersection

The intersection of the source RDD and the parameter RDD returns a new RDD

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value1 = sc.makeRDD(List(
    1.2.3.4.5.9
), 2)
val value2 = sc.makeRDD(List(
    5.7.8.9
), 2)
value1.intersection(value2).saveAsTextFile("out2")
sc.stop()
Copy the code

14. union

The union of the source RDD and the parameter RDD returns a new RDD

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value1 = sc.makeRDD(List(
    1.2.3.4.5.9
), 2)
val value2 = sc.makeRDD(List(
    5.7.8.9
), 2)
value1.union(value2).collect().foreach(println)
sc.stop()
Copy the code

15. subtract

Take one RDD element as the main element, remove the duplicate elements in the two RDD, and keep the other elements. O difference set

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value1 = sc.makeRDD(List(
    1.2.3.4.5.9
), 2)
val value2 = sc.makeRDD(List(
    5.7.8.9
), 2)
value1.subtract(value2).collect().foreach(println)
sc.stop()
Copy the code

16. zip

Combine elements in two RDD as key-value pairs. Where, Key in the key-value pair is the element in the first RDD, and Value is the element in the same position in the second RDD.

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value1 = sc.makeRDD(List(
    1.2.5.9
), 2)
val value2 = sc.makeRDD(List(
    5.7.8.9
), 2)
value1.zip(value2).collect().foreach(println)
sc.stop()
Copy the code

17. partitionBy

Repartitions the data according to the specified Partitioner. The default Spark partitioner is the HashPartitioner

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3), ("b".3), ("b".3),
    ("b".4), ("b".5), ("a".6)),4)
value.partitionBy(new HashPartitioner(2)).saveAsTextFile("out1")
sc.stop()
Copy the code

18. reduceByKey

You can aggregate data against values based on the same Key

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
    ("b".4), ("b".5), ("a".6)),2)
value.reduceByKey(_ + _).collect().foreach(println)
sc.stop()
Copy the code

19. groupByKey

Group data source values by key

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
    ("b".4), ("b".5), ("a".6)),2)
value.groupByKey().collect().foreach(println)
sc.stop()
Copy the code

20. aggregateByKey

Data is computed within and between partitions according to different rules

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
    ("b".4), ("b".5), ("a".6)),2)
value.aggregateByKey(0)(_ + _, _ * _).collect().foreach(println)
sc.stop()
Copy the code

21. foldByKey

When the calculation rules within and between partitions are the same, the aggregateByKey can be simplified to foldByKey

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
    ("b".4), ("b".5), ("a".6)),2)
value.foldByKey(0)(_ + _).collect().foreach(println)
sc.stop()
Copy the code

22. combineByKey

The most common aggregation function for key-value RDD

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
    ("b".4), ("b".5), ("a".6)),2)
value.combineByKey(
    (_, 1),
    (acc: (Int.Int), v) => (acc._1 + v, acc._2 + 1),
    (acc1: (Int.Int), acc2: (Int.Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
val value1 = value.combineByKey(
    (_, 1),
    (t: (Int.Int), c) => {
        (t._1, t._2 + c)
    }
    , (t: (Int.Int), c: (Int.Int)) => {
        (t._1, t._2 + c._2)
    }
)
value1.collect().foreach(println)
sc.stop()
Copy the code

23. sortByKey

Called on a (K,V) RDD, K must implement the Ordered interface, which returns a key Ordered

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
    ("b".7), ("b".5), ("a".6)),2)
value.sortByKey(ascending = false).collect().foreach(println)
sc.stop()
Copy the code

24. Join, leftOuterJoin, rightOuterJoin, fullOuterJoin

Called on RDD of type (K,V) and (K,W), returns a RDD of (K,(V,W) where all elements corresponding to the same key are joined together

Cartesian products may occur in Hive-like SQL

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value1 = sc.makeRDD(List((1.2), (2.3), (3.2), (4.1)))val value2 = sc.makeRDD(List((1."1111"), (3."3333"), (2."2222"), (5."2222")
))

value1.join(value2).collect().foreach(println)
println("-- -- -- -- -- -- -- -- -- -- -- -- -- -")
value1.leftOuterJoin(value2).collect().foreach(println)
println("-- -- -- -- -- -- -- -- -- -- -- -- -- -")
value1.rightOuterJoin(value2).collect().foreach(println)
println("-- -- -- -- -- -- -- -- -- -- -- -- -- -")
value1.fullOuterJoin(value2).collect().foreach(println)
sc.stop()
Copy the code

26. cogroup

Called on RDD of type (K,V) and (K,W), returns an RDD of type (K,(Iterable<V>,Iterable<W>))

// Use examples
val sc = new SparkContext(
    new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value1 = sc.makeRDD(List((1.2), (2.3), (3.2), (2.33), (4.1)))val value2 = sc.makeRDD(List((1."1111"), (3."3333"), (2."2222"), (5."2222"), (2."2222")
))
value1.cogroup(value2).collect().foreach(println)
sc.stop()
Copy the code