Transform type operator
1.1 the Value type
1.1.1 map operator
Introduction:
Returns a new RDD that consists of each input element transformed by the func function, meaning that each element in the RDD executes this method once
Code:
// Create SparkConf to set the local running mode
val conf = new SparkConf()
.setMaster("local[1]")
.setAppName("MapOperator")
/ / create SparkContext
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
// Create data
val rdd = sc.parallelize(List("Li bai"."Han xin"."Zhang fei")).cache()
// Use the Map operator
val result = rdd.map((x) => (x,1))
// Print the result
result.foreach((x) => println(x.toString()))
/ / close SparkContext
sc.stop()
Copy the code
1.1.2 mapParatition operator
Introduction:
The func function type must be Iterator[T] => Iterator[U] when running on an RDD of type T. Assuming that there are N elements and M partitions, the map function will be called N times and mapPartitions will be called M times, one function processing all partitions at once.
Code:
// Create SparkConf to set the local running mode
val conf = new SparkConf()
.setMaster("local[1]")
.setAppName("MapPartitionsOperator")
/ / create SparkContext
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
/** * is similar to map, but runs independently on each shard of the RDD, whereas map runs once on each element */
// Create data
val rdd = sc.parallelize(List("Li bai"."Han xin"."Zhang fei")).cache()
// Use MapParatitions
// add: {usually used when writing code block (is a single line of code can be used directly
def fun(x : Iterator[String]): Iterator[Tuple2[String, Int]] = {// Create a collection of type Tuple to store data
// List needs to be created as var because concatenation needs to point to the new List object
var list = List[Tuple2[String, Int]]()
while (x.hasNext) {
// x
var elem = x.next()
// Store data to List and add data to List using concatenated collection
list = list.:::(List(new Tuple2[String, Int](elem, 1)))
// Note: The difference between :: and: is that :: takes a List and :: takes an element
}
list.iterator
}
// You can use the anonymous function directly or you can define the function directly
// rdd.mapPartitions(fun)
val result = rdd.mapPartitions { x =>
// Create a collection of type Tuple to store data
// List needs to be created as var because concatenation needs to point to the new List object
var list = List[Tuple2[String, Int]]()
while (x.hasNext) {
// x
var elem = x.next()
// Store data to List and add data to List using concatenated collection
list = list.:::(List(new Tuple2[String, Int](elem, 1)))
// Note: The difference between :: and: is that :: takes a List and :: takes an element
}
list.iterator
}
// Iterate over the result and print
result.foreach(println(_))
/ / close SparkContext
sc.stop()
Copy the code
1.1.3 mapPartitionsWithIndex operator
- MapPartitionsWithIndex (func) is similar to mapPartitions, but func takes an integer parameter to indicate the index value of the shard, which is equivalent to indexed partitions
- So when running on RDD of type T, the func function type must be (Int, Interator[T]) => Iterator[U];
- Requirement: Create an RDD so that each element forms a tuple with its partition to form a new RDD
Code:
// Indexed MapPartitions
// Create SparkConf to set the local running mode
val conf = new SparkConf()
.setMaster("local[1]")
.setAppName("MapPartitionsWithIndexOperator")
/ / create SparkContext
val sc = new SparkContext(conf)
sc.setLogLevel("ERROR")
// Create data
val rdd = sc.parallelize(List("Li bai"."Han xin"."Zhang fei")).cache()
def fun(index: Int, x: Iterator[String]): Iterator[Tuple2[Int, String]] = {
// List needs to be created as var because concatenation needs to point to the new List object
var list = List[Tuple2[Int, String]]()
while(x.hasNext){
// Get the elements in the iterator
var elem = x.next()
// Note that the arguments to.: () are elements, while the arguments to.::() are lists
list = list.::(new Tuple2[Int, String](index, elem))
}
list.iterator
}
val result = rdd.mapPartitionsWithIndex(fun)
// Iterate over the result and print
result.foreach(println(_))
/ / close SparkContext
sc.stop()
Copy the code
1.1.4 flatMap operator
Similar to a map, but each input element can be mapped to zero or multiple output elements (so func should return a sequence, not a single element)
For example, if the normal return is:
List (1, 2, 3)
List (1, 2, 3, 4)
List (1, 2, 3, 4, 5)
FlatMap will flatten the data into a List
123 1234 12345
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("FlatMapOperator")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
Paralize: seq => local collection of RDD: List... , numSlices: number of partitions * 2. MakeRDD */ applies
val rdd = sc.makeRDD(1 to 5)
val flatMapResult = rdd.flatMap(1 to _)
val mapResult = rdd.map(1 to _)
// Map and Flat Map
mapResult.foreach(println(_))
flatMapResult.foreach(println(_))
/ / close SparkContext
sc.stop()
Copy the code
1.1.5 Difference between MAP and mapParatition
- Map () : Processes data one at a time.
- MapPartition () : processes data in one partition at a time. Data in the original PARTITION in the RDD can be released only after data in this partition is processed, which may result in OOM.
- Development guidance: When the memory space is large, mapPartition() is recommended to improve processing efficiency.
1.1.6 glom operator
- RDD[Array[T]] RDD[Array[T]]
- Requirement: Create a 4-partition RDD and put the data for each partition into an array
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
RDD[Array[T]] [Array[T]]
//2. Requirements: Create a 4-partition RDD and place the data from each partition into an array
// Generate RDD data and set partition to 4
val rdd = sc.parallelize(1 to 10.4)
// Note the return value
val result : RDD[Array[Int]] = rdd.glom()
result.foreach(arr => {
// ARR is Array type
for (i <- 0 until arr.length) {
// Print the result
println(arr(i))
}
println("= = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = = =")})/ / close SparkContext
sc.stop()
Copy the code
1.1.7 groupBy operator
- Function: Group by the return value of the passed function. Put the value of the same key into an iterator.
- Requirement: Create an RDD, grouped by element modules with a value of 2
- Note: this operator is not efficient and is not recommended. For details, please refer to the Note section of the corresponding method in the source code
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Create data
val rdd = sc.parallelize(1 to 10)
val result : RDD[(String, Iterable[Int])]= rdd.groupBy(x => {
var key = ""
x match {
case _ if (x < 3) => {
key = "small"
}
case _ if (x > 3 && x < 5) => {
key = "big"
}
case _ if (x > 5) => {
key = "very big"
}
case _ => {
key = "void"
}
}
key
})
// Iterate over the result
result.foreach(x => {
println("key : " + x._1 + " \t" + x._2)
})
sc.stop()
Copy the code
1.1.8 filter operator
- Function: filter. Returns a new RDD consisting of input elements evaluated by the func function that return true.
- Requirement: Create an RDD (consisting of strings) and filter out a new RDD (including the “xiao” substring)
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
val rdd = sc.parallelize(1 to 10)
/ / filter. Returns a new RDD consisting of input elements evaluated by the func function that return true.
val result : RDD[Int] = rdd.filter(x => {
if (x % 2= =0) {true
}
false
})
// Print the result
result.foreach(println(_))
sc.stop()
Copy the code
1.1.9 sample operator
- Function: Randomly sampling the number of fraction data with the specified random seed,
A fraction with a value of [0,1] represents the percentage of data to be extracted. For example, fraction = 0.3 represents 30% of data to be extracted
WithReplacement indicates whether the extracted data is put back, true is a sample with put back, false is a sample without put back, and seed is used to specify the random number generator seed.
- Requirements: Create an RDD (1-10) from which to select put back and do not put back samples
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Create data
val rdd = sc.parallelize(1 to 200)
/ / sampling
/** * function: Fraction * the number of total data randomly sampled from the specified random seed, * fraction with the size of [0,1] represents the percentage of data to be extracted. For example, fraction = 0.3 represents 30% of data to be extracted. * withReplacement represents whether the extracted data is replaced, and true represents a sample with the retrieved data. False is sampling without putting back, and seed is used to specify the random number generator seed. * /
val result = rdd.sample(true.0.2.1234L)
/ / traverse
result.foreach(println(_))
/ / close SparkContext
sc.stop()
Copy the code
1.1.10 distinct operator
- Effect: Returns a new RDD after deduplication of the source RDD. By default, there are only eight parallel tasks to operate on, but this can be changed by passing an optional numTasks parameter.
- Requirement: Create an RDD and use distinct() to de-duplicate it
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Create data
val rdd = sc.parallelize(List("Zhang"."Korea"."Li"."Korea"."The king"."The king"))
// Use de-weighting
val result = rdd.distinct()
// Print the result
result.foreach(println(_))
sc.stop()
Copy the code
1.1.11 coalesce operator
- Function: Reduces the number of partitions and improves the execution efficiency of small data sets after filtering large data sets.
Having too many partitions is actually not a good idea if you have a small amount of data,
- Requirement: Create a 4-partition RDD and scale it down
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Generate data
val rdd = sc.parallelize(1 to 10.4)
// Shrink the partition
NumPartitions are repartitioned data. Shuffle true indicates that data is shuffled again
// Default is false
val result = rdd.coalesce(2.true)
// The number of partitions after repartitioning
val numPartitions = result.partitions.size
println(numPartitions)
sc.stop(a)
Copy the code
1.1.12 repartition operator
- Function: reshuffle all data randomly through the network according to the number of partitions. You can use the coalesce operator to achieve the same effect
After shuffling, data is evenly distributed
- Requirement: Create a 4-partition RDD and repartition it
Code:
def printPartition(rdd : RDD[Int]): Unit ={
rdd.foreachPartition(f => {
// A method to iterate over Iterator data
while(f.hasNext){
var element = f.next()
print(element)
}
println()
})
}
def main(args: Array[String]): Unit = {
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Create data
val rdd = sc.parallelize(1 to 15.4)
// Prints data for each partition
printPartition(rdd)
// Repartition and shuffle data
// Shuffle can distribute data evenly
val result = rdd.repartition(3)
println("Repartition and shuffle data.....")
printPartition(result)
sc.stop()
}
Copy the code
1.1.13 Difference between coalesce and Repartition
- Coalesce Repartitions. You can select whether to perform shuffle. Shuffle: Boolean = false/true.
- Repartition is actually the coalesce of the call, which is shuffled by default. The source code is as follows:
Code:
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)}Copy the code
1.1.14 sortBy operator
- Role; SortBy (func,[Ascending], [numTasks]) Uses func to process the data first and sorts the data according to the comparison result after processing. The default order is positive. Note: The final result is the original sorted data!
- Requirement: Create an RDD and sort by different rules
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
val rdd = sc.parallelize(List(1.4.2.5.8.3))
// Use the function to process the data first, and then sort the processed data
// Note that the original data is still returned
// If f % 2, 4%2 => 0, 1%2 => 1
// Rank 1 before 4 => 1,4
val result = rdd.sortBy(f => {
f % 2
})
// Iterate over the structure
result.foreach(println(_))
sc.stop()
Copy the code
1.1.15 pipe operator
- Effect: pipe, executes a shell script for each partition and returns the RDD output. Note: The script needs to be placed in a location accessible to the Worker node
- Requirement: Write a script and use pipes to apply the script to the RDD.
- Note: This won’t run on Windows for the time being because shell scripts need to be executed
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
val rdd = sc.parallelize(List("Han xin"."White dragon"."Qing emperor"), 1)
val result = rdd.pipe("pipe.sh")
// Print the result
result.foreach(print(_))
sc.stop()
Copy the code
1.2 Interaction between Two Values
1.2.1 the union operator
- Effect: Returns a new RDD after the union of the source RDD and parameter RDD
- Requirement: Create two RDDS and find the union
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Create data
val rdd = sc.parallelize(1 to 5)
val otherRdd = sc.parallelize(3 to 6)
// Find the union of two RDD's
val result = rdd.union(otherRdd)
// Print the result
result.foreach(println(_))
sc.stop()
Copy the code
1.2.2 the subtract operator
- What it does: A function that calculates the difference. If you remove the same elements from two RDD’s, different data is retained
Example: RDD: 1,2,3,4,5,6 otherRdd: 4,5,6,7,8
It will clear the data that is in both the RDD and otherRdd from the RDD and return the rest of the RDD data back
Result: result: 1,2,3
- Requirement: Create two RDD and find the difference between the first AND the second RDD
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Create data
val rdd = sc.parallelize(1 to 6)
val otherRdd = sc.parallelize(4 to 8)
// Calculate the difference set of two RDD, remove the same data of two RDD, keep different data
// Be careful!!
RDD: 1,2,3,4,5,6 otherRdd: 4,5,6,7,8
// It clears data from the RDD and otherRdd and returns the rest of the RDD
val result = rdd.subtract(otherRdd)
// Print the result
result.foreach(println(_))
sc.stop()
Copy the code
1.2.3 intersection computes operator
- Effect: Returns a new RDD after the intersection of the source RDD and the parameter RDD
- Requirement: Create two RDDS and find the intersection of the two RDDS
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Create data
val rdd = sc.parallelize(1 to 6)
val otherRdd = sc.parallelize(4 to 8)
// A new RDD is returned after the intersection of the source AND parameter RDD
val result = rdd.intersection(otherRdd)
// Print the result
result.foreach(println(_))
sc.stop()
Copy the code
1. The cartesian operator
- Function: Cartesian product (try to avoid using)
- Requirement: Create two RDDS and compute the Cartesian product of the two RDDS
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Create data
val rdd = sc.parallelize(1 to 6)
val otherRdd = sc.parallelize(4 to 8)
// Compute the Cartesian product
// Use with caution, as the results can be of a large order of magnitude
// For example, the RDD calculation of two 100k data cartesian results in 100K * 100K => 10 billion
val result = rdd.cartesian(otherRdd)
// Print the result
result.foreach(println(_))
sc.stop()
Copy the code
1.2.5 zip operator
- Function: Combine two RDD’s into a Key/Value RDD. By default, the number of partitions and elements in the two RDD’s are the same. Otherwise, an exception will be thrown.
- Requirement: Create two RDDS and combine them to form a (K, V)RDD
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Create data
val rdd = sc.parallelize(List(1.2.3), 1)
val otherRdd = sc.parallelize(List("A"."B"."C"), 1)
// Combine the two RDDS into Key/Value RDDS. By default, the partition number of the two RDDS is
// With the same number of elements, otherwise an exception will be thrown!
val result = rdd.zip(otherRdd)
result.foreach(println(_))
sc.stop()
Copy the code
1.3 the Key – Value types
1.3.1 partitionBy operator
- The pairRDD is partitioned. If the original partionRDD is the same as the existing partionRDD, the pairRDD is not partitioned. Otherwise, a ShuffleRDD is generated, which is a shuffle process.
- Requirement: Create a 4-partition RDD and repartition it
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Create data
val rdd = sc.parallelize(Array("AA"."BB"."CC"), 4).map(x => (x,1))
PairRDD is partitioned if the original partionRDD is the same as the existing partionRDD
// If the partition is not performed, ShuffleRDD will be generated, that is, shuffle will be generated.
// Check the number of partitions
println("Partition number: ====>" + rdd.partitions.size)
// Repartition RDD
val result = rdd.partitionBy(new HashPartitioner(2))
println("Number of repartitions: ====>" + result.partitions.size)
sc.stop()
Copy the code
1.3.2 groupByKey operator
- GroupByKey also operates on each key, but generates only one sequence.
- Requirement: Create a pairRDD, aggregate the corresponding values of the same key into a sequence, and calculate the sum of the corresponding values of the same key.
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Create data
val pairRdd = sc.parallelize(List("hello"."word"."word"."hello"))
.map(x => (x,1))
// Aggregate data with the same key
val result = pairRdd.groupByKey(2).map(x => (x._1, x._2.sum))
// Prints data
result.foreach(println(_))
sc.stop()
Copy the code
1.3.3 reduceByKey operator
- Returns an RDD of (K,V) and aggregates the values of the same key using the specified Reduce function. The number of Reduce jobs can be set using the second optional parameter.
- Requirement: Create a pairRDD that computes the sum of the corresponding values of the same key
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Create data
val pairRdd = sc.parallelize(List("hello"."word"."word"."hello"))
.map(x => (x,1))
// Note that both of these are values of value
val result = pairRdd.reduceByKey((v1,v2) => {
v1 + v2
})
// Print the result
result.foreach(println(_))
sc.stop()
Copy the code
1.3.4 Differences between reduceByKey and groupByKey
- ReduceByKey: Aggregate according to key, combine (pre-aggregate) before shuffle, and return RDD[K, V].
- GroupByKey: Shuffle groups by key.
- Development guide: reduceByKey is better than groupByKey, which is recommended. However, you need to pay attention to whether the business logic is affected
1.3.5 aggregateByKey operator
Parameter: (zeroValue:U,[Partitioner: Partitioner]) (seqOp: (U, V) => U,combOp: (U, U) => U)
Function: In THE RDD of KV pair, value is grouped and merged according to key. During the merger, each value and initial value are taken as parameters of SEQ function for calculation, and the returned result is taken as a new KV pair, and then the result is merged according to key. Finally, the value of each grouping is passed to the Combine function for calculation (the first two values are calculated first, the return result and the next value are passed to the Combine function, and so on), and the key and the calculated result are taken as a new KV pair output.
Parameter description: (1) zeroValue: Gives an initial value to each key in each partition. (2) seqOp: function used for iterating value with initial value in each partition step by step; (3) combOp: the function is used to merge the results in each partition.
Requirement: Create a pairRDD, take the maximum value of the same key for each partition, and add it up
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
val rdd = sc.parallelize(List("A"."B"."C"),2).map((x) => (x,1))
// pairRdd can be converted directly
val pairRdd = sc.parallelize(List(("a".3), ("a".2), ("c".4), ("b".3), ("c".6), ("c".8)),2)
// Take the maximum value of the same key for each partition and add them
// (1) zeroValue: an initial value for each key in each partition;
// (2) seqOp: the function is used to iterate over values with initial values in each partition;
// (3) combOp: the function is used to merge the results in each partition.
// Note: the whole process! The pairRdd key does not participate in the computation
val result = pairRdd.aggregateByKey(0)((k, v) => {
// k is zeroValue, and v is RDD value
math.max(k,v)
}, (u1,u2) => {
// Merge the values of value
u1 + u2
})
// Print the result
result.foreach(println(_))
sc.stop()
Copy the code
1.3.6 foldByKey operator
ZeroValue: V)(func: (V, V) => V): RDD[(K, V)]
- Purpose: Simplified operation of aggregateByKey, same as seqop and combop
- Requirement: Create a pairRDD that computes the sum of the corresponding values of the same key
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// pairRdd can be converted directly
val pairRdd = sc.parallelize(List(("a".3), ("a".2), ("c".4), ("b".3), ("c".6), ("c".8)),2)
AggregateByKey. Seqop is the same as combop
//2. Requirements: Create a pairRDD and calculate the sum of the values of the same key
val result = pairRdd.foldByKey(0)((v1,v2) => {
v1 + v2
})
// Print the result
result.foreach(println(_))
sc.stop()
Copy the code
1.3.7 combineByKey operator
Parameters: (createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
Function: Combine V into a set for the same K.
CreateCombiner: combineByKey() iterates over all elements in the partition, so the key of each element is either not encountered yet or is the same as the key of a previous element. If this is a new element,combineByKey() uses a function called createCombiner() to create the initial value of the accumulator corresponding to that key (2) mergeValue: If this is a key already encountered before processing the current partition, it merges the current value of the key’s accumulator with the new value using the mergeValue() method. (3) mergeCombiners: Since each partition is processed independently, there can be multiple accumulators for the same key. If two or more partitions have accumulators corresponding to the same key, you need to merge the results of the partitions using the user-provided mergeCombiners() method.
Requirement: Create a pairRDD and calculate the mean of each key based on the key. (First calculate the number of occurrences of each key and the sum of corresponding values, and then divide to get the result
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Create data
val rdd = sc.parallelize(
Array(("a".88), ("b".95), ("a".91), ("b".93), ("a".95), ("b".98)),2)
val result = rdd.combineByKey(
// createCombiner
// ("a",88) ("a", 91) only one (88,1) key of each type ("a") will be generated
(x) => {(x,1)},
// mergeValue the mergeValue phase merges the same data as the RDD key(i.e. "a"/"b"),
// for example, the result of createCombiner is: (88,1) 91,95
// start merge acc: (88,1) v: 91, => (179, 2)
(acc:(Int,Int), v) => {(acc._1 + v, acc._2 + 1)},
// This stage is the data of the same key in each partition is merged
Partition 1 = (179,2); partition 2 = (95,1);
/ / (274, 3), 274
(acc1 : (Int,Int), acc2 : (Int, Int)) =>{(acc1._1 + acc2._1, acc1._2 + acc2._2)}
)
result.foreach(println(_))
sc.stop()
Copy the code
1.3.8 sortByKey operator
- Function: When called on a (K,V) RDD, K must implement the Ordered interface to return a (K,V) RDD Ordered by key
- Requirement: Create a pairRDD that sorts the keys in forward and backward order
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Create data
val rdd = sc.parallelize(Array((3."aa"), (6."cc"), (2."bb"), (1."dd")))
// Sort by key: true; false
val result = rdd.sortByKey(true)
result.foreach(println(_))
sc.stop()
Copy the code
1.3.9 mapValues operator
- Types of the form (K,V) operate only on V
- Requirements: create a pairRDD, and add the value string “| | |”
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Create data
val rdd = sc.parallelize(Array((1."a"), (1."d"), (2."b"), (3."c")))
// Operate only on value
val result = rdd.mapValues(v => {
v + "| | |"
})
// Print the result
result.foreach(println(_))
sc.stop()
Copy the code
1.3.10 join operator
- Function: Call on RDD of type (K,V) and (K,W) to return the RDD of (K,(V,W) with all elements corresponding to the same key
- Requirement: Create two pairRDD’s and aggregate data with the same key into a tuple.
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Create data
val rdd = sc.parallelize(Array((1."a"), (2."b"), (3."c")))
val otherRdd = sc.parallelize(Array((1."A"), (2."B"), (3."C")))
// Called on RDD of type (K,V) and (K,W), returns all pairs of the same key in one
// RDD of (K,(V,W))
val result = rdd.join(otherRdd)
result.foreach(println(_))
sc.stop()
Copy the code
1.3.11 cogroup operator
- Function: Returns an RDD of type (K,(Iterable,Iterable))
- Requirement: Create two pairRDD’s and aggregate the data with the same key into an iterator.
Code:
/ / create SparkContext
val conf = new SparkConf()
.setAppName("Spark APP")
.setMaster("local[1]")
val sc = new SparkContext(conf)
// Set the level of SparkContext print logs
sc.setLogLevel("WARN")
// Create data
val rdd = sc.parallelize(Array((1."a"), (2."b"), (3."c")))
val otherRdd = sc.parallelize(Array((1."A"), (2."B"), (3."C")))
// Called on RDD of type (K,V) and (K,W), returns a class (K,(Iterable
,Iterable
))
/ / type of RDD
val result = rdd.cogroup(otherRdd)
result.foreach(value => {
val key = value._1
val v1 : Iterable[String] = value._2._1
val v2 : Iterable[String] = value._2._2
print(key + "")
print(v1 + "")
print(v2 + "")
println(a)
})
sc.stop(a)
Copy the code