This is the fifth day of my participation in Gwen Challenge
introduce
-
RDD operators can be classified into two kinds of transformations or actions in terms of data operations.
- Conversion operator: To convert one RDD to another RDD is just a function overlay and is not actually performed. (Decorator design mode)
- Action operator: The action operator actually triggers SparkContext to submit the Job.
This article describes the conversion operator
1. map
The processed data is mapped and transformed one by one, where the conversion can be a type conversion or a value conversion
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)
sc.makeRDD(List(1.2.3.4.5), 2).map(("num",_)).collect().foreach(println(_))
sc.stop()
Copy the code
2. mapPartitions
The data to be processed is sent to the compute node in the unit of partition for processing. The processing here means that the data can be processed arbitrarily, even if it is filtered
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)
sc.makeRDD(List(1.2.3.4.5), 2).mapPartitions(iter => {
println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --"+iter.mkString(""))
iter
}).collect()
sc.stop()
Copy the code
3. mapPartitionsWithIndex
The data to be processed is sent to the compute node in the unit of partition for processing. By processing, it means that you can perform any processing, even filtering data, and obtain the current partition index during processing
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions")
)
sc.makeRDD(List(1.2.3.4.5), 2).mapPartitionsWithIndex((index, iter) => {
println(index + "-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --" + iter.mkString(""))
iter
}).collect()
sc.stop()
Copy the code
4. flatMap
The data processed is flattened and then mapped, so the operator is also called flat mapping
// Use examples
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
val context = new SparkContext(sparkConf)
val lines = context.textFile("data/word.txt")
lines.flatMap(_.split("")).map((_, 1)).reduceByKey(_ + _).foreach(println(_))
sc.stop()
Copy the code
5. glom
The data in the same partition is directly converted into the same type of memory array for processing, the partition remains unchanged
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val glom: RDD[Array[Int]] = sc.makeRDD(List(1.2.3.4.5), 2).glom()
val map = glom.map(arr => {
arr.sum
})
map.collect().foreach(println)
sc.stop()
Copy the code
6. groupBy
Data is grouped according to specified rules. Partitions remain unchanged by default, but data is shuffled and regrouped. In extreme cases, data may be grouped in the same partition
The data of one group is in one partition, but it is not said that there is only one group in one partition
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
("b".4), ("b".5), ("a".6)),2)
value.groupBy(_._1).collect().foreach(println)
sc.stop()
Copy the code
7. filter
Data is filtered according to the specified rules. Data that meets the rules is retained and data that does not meet the rules is discarded.
After data is filtered, partitions remain the same, but data in partitions may be unbalanced, which may occur in the production environmentData skew
.
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
("b".4), ("b".5), ("a".6)),2)
value.filter(_._2 > 3).collect().foreach(println)
sc.stop()
Copy the code
8. sample
Extract data from a data set according to the specified rules
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
("b".4), ("b".5), ("a".6)),2)
value.sample(withReplacement = false.0.5).collect().foreach(println)
value.sample(withReplacement = true.0.7).collect().foreach(println)
sc.stop()
Copy the code
9. distinct
Deduplication of duplicate data in a dataset
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3), ("b".3), ("b".3),
("b".4), ("b".5), ("a".6)),2)
value.distinct().collect().foreach(println)
sc.stop()
Copy the code
10. coalesce
Partition reduction based on data volume is used to improve the execution efficiency of small data sets after filtering large data sets
If spark has too many small tasks, you can use the coalesce method to reduce the number of coalesce partitions and reduce the task scheduling cost
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3), ("b".3), ("b".3),
("b".4), ("b".5), ("a".6)),4)
// You can only reduce partitions by default. If shuffle is enabled, you can increase partitions
value.coalesce(2).saveAsTextFile("out1")
sc.stop()
Copy the code
11. repartition
The coalesce operation is performed internally. The default value of shuffle is true. The repartition operation can be completed either by converting an RDD with a large number of partitions to an RDD with a small number of partitions or by converting an RDD with a large number of partitions to an RDD with a large number of partitions.
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3), ("b".3), ("b".3),
("b".4), ("b".5), ("a".6)),4)
value.repartition(6).saveAsTextFile("out3")
sc.stop()
Copy the code
12. sortBy
This operation is used to sort data. Before sorting, the data can be processed by the F function, and then sorted by the result of the F function processing, which is in ascending order by default. The number of partitions in the new RDD is the same as the number of partitions in the original RDD. There’s a shuffle
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(
1.3.5.6.7.8.32.23.4.5.62.12.4
), 4)
//ascending = false
value.sortBy(i => i, numPartitions = 1, ascending = false).saveAsTextFile("out3")
sc.stop()
Copy the code
13. intersection
The intersection of the source RDD and the parameter RDD returns a new RDD
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value1 = sc.makeRDD(List(
1.2.3.4.5.9
), 2)
val value2 = sc.makeRDD(List(
5.7.8.9
), 2)
value1.intersection(value2).saveAsTextFile("out2")
sc.stop()
Copy the code
14. union
The union of the source RDD and the parameter RDD returns a new RDD
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value1 = sc.makeRDD(List(
1.2.3.4.5.9
), 2)
val value2 = sc.makeRDD(List(
5.7.8.9
), 2)
value1.union(value2).collect().foreach(println)
sc.stop()
Copy the code
15. subtract
Take one RDD element as the main element, remove the duplicate elements in the two RDD, and keep the other elements. O difference set
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value1 = sc.makeRDD(List(
1.2.3.4.5.9
), 2)
val value2 = sc.makeRDD(List(
5.7.8.9
), 2)
value1.subtract(value2).collect().foreach(println)
sc.stop()
Copy the code
16. zip
Combine elements in two RDD as key-value pairs. Where, Key in the key-value pair is the element in the first RDD, and Value is the element in the same position in the second RDD.
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value1 = sc.makeRDD(List(
1.2.5.9
), 2)
val value2 = sc.makeRDD(List(
5.7.8.9
), 2)
value1.zip(value2).collect().foreach(println)
sc.stop()
Copy the code
17. partitionBy
Repartitions the data according to the specified Partitioner. The default Spark partitioner is the HashPartitioner
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3), ("b".3), ("b".3),
("b".4), ("b".5), ("a".6)),4)
value.partitionBy(new HashPartitioner(2)).saveAsTextFile("out1")
sc.stop()
Copy the code
18. reduceByKey
You can aggregate data against values based on the same Key
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
("b".4), ("b".5), ("a".6)),2)
value.reduceByKey(_ + _).collect().foreach(println)
sc.stop()
Copy the code
19. groupByKey
Group data source values by key
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
("b".4), ("b".5), ("a".6)),2)
value.groupByKey().collect().foreach(println)
sc.stop()
Copy the code
20. aggregateByKey
Data is computed within and between partitions according to different rules
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
("b".4), ("b".5), ("a".6)),2)
value.aggregateByKey(0)(_ + _, _ * _).collect().foreach(println)
sc.stop()
Copy the code
21. foldByKey
When the calculation rules within and between partitions are the same, the aggregateByKey can be simplified to foldByKey
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
("b".4), ("b".5), ("a".6)),2)
value.foldByKey(0)(_ + _).collect().foreach(println)
sc.stop()
Copy the code
22. combineByKey
The most common aggregation function for key-value RDD
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
("b".4), ("b".5), ("a".6)),2)
value.combineByKey(
(_, 1),
(acc: (Int.Int), v) => (acc._1 + v, acc._2 + 1),
(acc1: (Int.Int), acc2: (Int.Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)
)
val value1 = value.combineByKey(
(_, 1),
(t: (Int.Int), c) => {
(t._1, t._2 + c)
}
, (t: (Int.Int), c: (Int.Int)) => {
(t._1, t._2 + c._2)
}
)
value1.collect().foreach(println)
sc.stop()
Copy the code
23. sortByKey
Called on a (K,V) RDD, K must implement the Ordered interface, which returns a key Ordered
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value = sc.makeRDD(List(("a".1), ("a".2), ("b".3),
("b".7), ("b".5), ("a".6)),2)
value.sortByKey(ascending = false).collect().foreach(println)
sc.stop()
Copy the code
24. Join, leftOuterJoin, rightOuterJoin, fullOuterJoin
Called on RDD of type (K,V) and (K,W), returns a RDD of (K,(V,W) where all elements corresponding to the same key are joined together
Cartesian products may occur in Hive-like SQL
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value1 = sc.makeRDD(List((1.2), (2.3), (3.2), (4.1)))val value2 = sc.makeRDD(List((1."1111"), (3."3333"), (2."2222"), (5."2222")
))
value1.join(value2).collect().foreach(println)
println("-- -- -- -- -- -- -- -- -- -- -- -- -- -")
value1.leftOuterJoin(value2).collect().foreach(println)
println("-- -- -- -- -- -- -- -- -- -- -- -- -- -")
value1.rightOuterJoin(value2).collect().foreach(println)
println("-- -- -- -- -- -- -- -- -- -- -- -- -- -")
value1.fullOuterJoin(value2).collect().foreach(println)
sc.stop()
Copy the code
26. cogroup
Called on RDD of type (K,V) and (K,W), returns an RDD of type (K,(Iterable<V>,Iterable<W>))
// Use examples
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("MapPartitions"))val value1 = sc.makeRDD(List((1.2), (2.3), (3.2), (2.33), (4.1)))val value2 = sc.makeRDD(List((1."1111"), (3."3333"), (2."2222"), (5."2222"), (2."2222")
))
value1.cogroup(value2).collect().foreach(println)
sc.stop()
Copy the code