Abstract:
RDD: Elastic distributed data set, which is a special set, supports multiple sources, has fault tolerance mechanism, can be cached, and supports parallel operation. An RDD represents a data set in a partition
RDD has two operators:
Transformation: Transformation is a deferred calculation. When one RDD is converted to another RDD, the Transformation is not performed immediately, but only the logical operation of the data set is remembered. It triggers the Spark job to run and the calculation of the Transformation operatorCopy the code
Basic transformation operations:
1. Map (func) : Each element in the dataset is transformed by user-defined functions to form a new RDD, called MappedRDD
(1)
object Map { def main(args: Array[String]) { val conf = new SparkConf().setMaster(“local”).setAppName(“map”) val sc = new SparkContext(conf) val rdd Foreach (x => print(x+” “)) sc.stop()}} if (x => print(x+” “)) sc.stop()}} Output:
2 4 6 8 10 12 14 16 18 20
Picture description (50 words Max)
(RDD dependency diagram: the red block represents an RDD area, and the black block represents the set of partitions, the same below)
Picture description (50 words Max)
2. FlatMap (func): Similar to map, but each element input item can be mapped to zero or more output items, and the result can be “flattened” before output
(2)
/ /… Omit the sc
Val RDD = sc.parallelize(1 to 5) val FM = rdd.flatmap (x => (1 to x)).collect() fm.foreach(x => print(x + “”))
1 1 2 1 2 3 3 1 2 3 4 1 2 3 4 5 If it is a map function, its output is as follows:
Range(1) Range(1, 2) Range(1, 2, 3) Range(1, 2, 3) Range(1, 2, 3, 4) Range(1, 2, 3, 4)
Picture description (50 words Max)
3. MapPartitions (func): Similar to Map, mapPartitions apply to each element of each partition, but mapPartitions apply to each partition worker
Iterator[T] => Iterator[U]
Assuming that there are N elements and M partitions, the map function will be called N times and mapPartitions will be called M times. MapPartitions can be used more efficiently when creating objects during the mapping process. For example, when writing data to the database, Using Map requires creating a Connection object for each element, but using mapPartitions requires creating a Connetcion object for each partition
(Example 3) : Output with female names:
Def partitionsFun(/index: Int,/iter: Iterator[(String,String)]) Iterator[String] = { var woman = ListString while (iter.hasNext){ val next = iter.next() next match { case (_,”female”) => woman = /”[“+index+”]”+/next._1 :: woman case _ => } } return woman.iterator }
def main(args: Array[String]) { val conf = new SparkConf().setMaster(“local”).setAppName(“mappartitions”) val sc = new SparkContext(conf) val l = List((“kpop”,”female”),(“zorro”,”male”),(“mobin”,”male”),(“lucy”,”female”)) val rdd = sc.parallelize(l,2) val mp = rdd.mapPartitions(partitionsFun) /val mp = rdd.mapPartitionsWithIndex(partitionsFun)/ Mp.collect. foreach(x => (print(x +” “)))
Kpop Lucy can actually do this in a single statement
1
Val mp = RDD. MapPartitions (x = > x.f ilter (_) _2 = = “female”)). The map (x = > x. _1)
The reason I didn’t do that is to demonstrate the definition of the function
Picture description (50 words Max)
(RDD dependency chart)
Picture description (50 words Max)
4. MapPartitionsWithIndex (func): Similar to mapPartitions, different time functions have additional parameters for partitioning indexes
Iterator[U] => Iterator[U]
(Example 4) : Remove the orange part of the comment in Example 3
Output :(with partition index)
[0]kpop [1]lucy
5. Sample (withReplacement, fraction, seed) : to specify the number of random seed random sampling the data for the fraction, withReplacement said is drawn is the data back, true for sampling with back, false for sampling without back
(Example 5) : Extract 50% of the data from the RDD randomly and put back, with a random seed value of 3 (i.e. may start with one of the values of 1, 2 and 3)
// omit val RDD = sc.parallelize(1 to 10) val sample1 = rdd.sample(true,0.5,3) sample1.collect. Foreach (x => print(x + “”)) sc.stop
6. Union (ortherDataset): Merges the data sets in two RDD, and finally returns the union of the two RDD. If there are the same elements in the RDD, the data will not be deduplicated
// elision sc.parallelize(1 to 3) val rdd2 = sc.parallelize(3 to 5) val unionRDD = rdd1.union(rdd2) Unionrdd.collect. foreach(x => print(x + “”)) sc.stop
One, two, three, three, four, five
7. Intersection (otherDataset): Returns the intersection of two RDD’s
// omits sc rdd1 = sc.parallelize(1 to 3) val rdd2 = sc.parallelize(3 to 5) val unionRDD = rdd1.intersection(rdd2) Unionrdd.collect. foreach(x => print(x + “”)) sc.stop
3, 4,
8. Distinct ([numTasks]): Deduplication elements in the RDD
// omit sc val list = list (1,1,2,5,2,9,6,1) val distinctRDD = sc.parallelize(list) val unionRDD = distinctrdd.distinct () Unionrdd.collect. Foreach (x => print(x + “”))
One, six, nine, five, two
9. Cartesian (otherDataset): Performs a Cartesian product operation on all elements in the two RDDS
Val rdd1 = sc.parallelize(1 to 3) val rdd2 = sc.parallelize(2 to 5) val cartesianRDD = rdd1.cartesian(rdd2) Foreach (x => println(x + “”))
(1, 2) (1, 3) (1, 4) (1, 5) (2, 2) (2, 3), (2, 4), (2, 5) (3, 2) (3, 3) (3, 4) (3, 5) (RDD dependency graph)
Picture description (50 words Max)
Coalesce (numPartitions, shuffle): repartitions RDD partitions. The default value of shuffle is false. If shuffle is set to false, the number of partitions cannot be increased
But will not report the error, but the number of partitions or the original
Shuffle =false
Val RDD = sc.parallelize(1 to 16,4) val coalesceRDD = rdd.coalesce(3) Don’t add partition number (that is, the partition number from 5 – > 7) println (” to partition the partition number: “+ coalesceRDD. Partitions. The size) output:
List(1, 2, 3, 4) List(5, 6, 7, 8) List(9, 10, 11, 12, 13, 14, 15, 16)
Shuffle =true
/ /… Omitted val RDD = sc.parallelize(1 to 16,4) val coalesceRDD = RDD. Coalesce (7,true) Println (” to partition the partition number: “+ coalesceRDD. Partitions. The size) println (” RDD dependencies:” + coalesceRDD. ToDebugString) output:
After repartitioning the partition number: 5 RDD dependencies: (5) MapPartitionsRDD [4] at coalesce at coalesce. Scala: 14 [] | CoalescedRDD [3] at coalesce at Coalesce.scala:14 [] | ShuffledRDD[2] at coalesce at Coalesce.scala:14 [] +-(4) MapPartitionsRDD[1] at coalesce at Coalesce. Scala: [14] | ParallelCollectionRDD [0] at parallelize the at Coalesce. Scala: after 13 [] / / partition data sets List (10, 13) List (1, List(2, 6, 12, 15) List(3, 7, 16) List(4, 8, 9) (RDD dependency graph :coalesce(3,flase))
Picture description (50 words Max)
(RDD dependency graph :coalesce(3,true))
Picture description (50 words Max)
11. Repartition (numPartition): indicates the implementation of the coalesce(numPartition,true) function. The effect is the same as that of coalesce(numPartition,true) in Example 9.1
12. Glom (): Convert elements of type T in each partition of RDD to Array Array[T]
//RDD = sc.parallelize(1 to 16,4) val glomRDD = rdd.glom() //RDD[Array[T]] 1. Println (RDD. GetClass. GetSimpleName)) sc. Stop output:
Int [] // indicates that elements in RDD are converted to arrays. Array[int]
13. RandomSplit (weight:Array[Double],seed): Divide an RDD into multiple RDD based on the weight weight value. The higher the weight, the more elements will be divided
// omit sc val RDD = sc.parallelize(1 to 10) val randomSplitRDD = rdd.randomSplit(Array(1.0,2.0,7.0)) randomSplitRDD(0).foreach(x => print(x +” “)) randomSplitRDD(1).foreach(x => print(x +” “)) randomSplitRDD(2).foreach(x => print(x +” “)) sc.stop
2 4 3 8 9 1 5 6 7 10 Welcome Java engineers who have worked for one to five years to join Java architecture development: jq.qq.com/?_wv=1027&k…
This group provides free study guidance framework materials and free solutions
If you don’t know any questions, you can ask them in this group. There will also be career planning and interview guidance
At the same time, we can pay more attention to the small series to learn and progress together