A, the Transformation

Common Spark Transformation operators are as follows:

The Transformation operator Meaning (Meaning)
map(func) Apply each element in the original RDDfuncFunction and generate a new RDD
filter(func) For each element in the original RDDfuncFunction to filter and generate a new RDD
flatMap(func) Similar to map, but each input item is mapped to zero or more output items (funcThe return type must be Seq.
mapPartitions(func) Similar to Map, but the function runs separately on each partition of the RDD,funcIterator<T> => Iterator<U>, where T is the type of RDD, i.e. RDD[T]
mapPartitionsWithIndex(func) Similar to mapPartitions, butfuncThe type is (Int, Iterator<T>) => Iterator<U>, where the first argument is the partition index
sample(withReplacement.fraction.seed) Data sampling, with three optional parameters: set withReplacement, and the percentage of samples (fraction), seed of random number generator;
union(otherDataset) Merge the two RDDS
intersection(otherDataset) Find the intersection of two RDDS
distinct([numTasks])) duplicate removal
groupByKey([numTasks]) (K, Iterable<V>) when the dataset of a (K, V) pair is upgraded

Note:Use if grouping is to perform an aggregation operation (for example, sum or Average) on each keyreduceByKeyaggregateByKeyBetter performance

Note:By default, parallelism depends on the number of partitions in the parent RDD. Can pass innumTasksParameters are modified.
reduceByKey(func[numTasks]) Groups the grouped data by key value and performs reduction operation on the grouped data.
aggregateByKey(zeroValue.numPartitions) (seqOp.combOp[numTasks]) Returns the data set of (K, U) pairs, which aggregate the value of each key using the given combinatorial function and zeroValue, when calling the data set of (K, V) pairs. Similar to groupByKey, the number of Reduce jobs can be configured using the second parameter.
sortByKey([ascending], [numTasks]) Order by key, which needs to implement the Ordered nature and can be compared
join(otherDataset[numTasks]) When the dataset of a (K, V) and (K, W) type is up-upped, a dataset of (K, (V, W) pairs is returned, which is equivalent to the internal join operation. This can be used if you want to perform an external connectionleftOuterJoin.rightOuterJoinfullOuterJoinSuch as operator.
cogroup(otherDataset[numTasks]) Returns the dataset of (K, (Iterable<V>, Iterable<W>)) tuples when the dataset of a (K, V) pair is up-upped.
cartesian(otherDataset) Returns a (T, U) dataset (cartesian product) when a T and U dataset is upgraded.
coalesce(numPartitions) Reduce the number of partitions in the RDD to numPartitions.
repartition(numPartitions) Randomly realign the data in the RDD to create more or fewer partitions and balance them.
repartitionAndSortWithinPartitions(partitioner) Repartitions the RDD based on the given partitioner and sorts the data in the partition by key. This is calledrepartitionThe sorting process is then more efficient because it pushes the sorting process to the machines where the shuffle operates.

Basic usage examples of these operators are shown below:

1.1 the map

val list = List(1.2.3)
sc.parallelize(list).map(_ * 10).foreach(println)

// Output: 10, 20, 30
Copy the code

1.2 the filter

val list = List(3.6.9.10.12.21)
sc.parallelize(list).filter(_ >= 10).foreach(println)

// Output: 10, 12, 21
Copy the code

1.3 flatMap

FlatMap (func) is similar to map, but each input item is mapped to zero or more output items (the func return type must be Seq).

val list = List(List(1.2), List(3), List(), List(4.5))
sc.parallelize(list).flatMap(_.toList).map(_ * 10).foreach(println)

// Output: 10, 20, 30, 40, 50
Copy the code

The flatMap operator has a high probability of being used in log analysis. Here is a illustration: Split each line of input data into a single word, and assign a value of 1, which means that it appears once. Then group the data according to the words and count the total number of occurrences.

val lines = List("spark flume spark"."hadoop flume hive")
sc.parallelize(lines).flatMap(line => line.split("")).
map(word=>(word,1)).reduceByKey(_+_).foreach(println)

/ / output:
(spark,2)
(hive,1)
(hadoop,1)
(flume,2)
Copy the code

1.4 mapPartitions

Func functions are of type Iterator

=> Iterator
(where T is the type of RDD), that is, both input and output must be iterable.

val list = List(1.2.3.4.5.6)
sc.parallelize(list, 3).mapPartitions(iterator => {
  val buffer = new ListBuffer[Int]
  while (iterator.hasNext) {
    buffer.append(iterator.next() * 100)
  }
  buffer.toIterator
}).foreach(println)
// Output the result
100 200 300 400 500 600
Copy the code

1.5 mapPartitionsWithIndex

Similar to mapPartitions, but with the func type (Int, Iterator

) => Iterator
, where the first argument is the partition index.

val list = List(1.2.3.4.5.6)
sc.parallelize(list, 3).mapPartitionsWithIndex((index, iterator) => {
  val buffer = new ListBuffer[String]
  while (iterator.hasNext) {
    buffer.append(index + "Partition." + iterator.next() * 100)
  }
  buffer.toIterator
}).foreach(println)
/ / output
0Partitions:100
0Partitions:200
1Partitions:300
1Partitions:400
2Partitions:500
2Partitions:600
Copy the code

1.6 the sample

Data sampling. There are three optional parameters: Set withReplacement, fraction, and seed for the random number generator:

val list = List(1.2.3.4.5.6)
sc.parallelize(list).sample(withReplacement = false, fraction = 0.5).foreach(println)
Copy the code

1.7 the union

Merge two RDD’s:

val list1 = List(1.2.3)
val list2 = List(4.5.6)
sc.parallelize(list1).union(sc.parallelize(list2)).foreach(println)
// Output: 1 2 3 4 5 6
Copy the code

1.8 intersection computes

Find the intersection of two RDD’s:

val list1 = List(1.2.3.4.5)
val list2 = List(4.5.6)
sc.parallelize(list1).intersection(sc.parallelize(list2)).foreach(println)
// Output: 4 5
Copy the code

1.9 the distinct

Go to:

val list = List(1.2.2.4.4)
sc.parallelize(list).distinct().foreach(println)
// Output: 4 1 2
Copy the code

1.10 groupByKey

Group by key:

val list = List(("hadoop".2), ("spark".3), ("spark".5), ("storm".6), ("hadoop".2))
sc.parallelize(list).groupByKey().map(x => (x._1, x._2.toList)).foreach(println)

/ / output:
(spark,List(3.5))
(hadoop,List(2.2))
(storm,List(6))
Copy the code

1.11 reduceByKey

Reduce by key:

val list = List(("hadoop".2), ("spark".3), ("spark".5), ("storm".6), ("hadoop".2))
sc.parallelize(list).reduceByKey(_ + _).foreach(println)

/ / output
(spark,8)
(hadoop,4)
(storm,6)
Copy the code

1.12 sortBy & sortByKey

Sort by key:

val list01 = List((100."hadoop"), (90."spark"), (120."storm"))
sc.parallelize(list01).sortByKey(ascending = false).foreach(println)
/ / output
(120,storm)
(90,spark)
(100,hadoop)
Copy the code

Sort by the specified element:

val list02 = List(("hadoop".100), ("spark".90), ("storm".120))
sc.parallelize(list02).sortBy(x=>x._2,ascending=false).foreach(println)
/ / output
(storm,120)
(hadoop,100)
(spark,90)
Copy the code

1.13 the join

When a (K, V) and (K, W) Dataset is up-tuned, a (K, (V, W)) Dataset is returned, equivalent to the inline join operation. If you want to perform external joins, you can use operators such as leftOuterJoin, rightOuterJoin, and fullOuterJoin.

val list01 = List((1."student01"), (2."student02"), (3."student03"))
val list02 = List((1."teacher01"), (2."teacher02"), (3."teacher03"))
sc.parallelize(list01).join(sc.parallelize(list02)).foreach(println)

/ / output
(1,(student01,teacher01))
(3,(student03,teacher03))
(2,(student02,teacher02))
Copy the code

1.14 cogroup

When the Dataset of a (K, V) pair is up-tuned, return the Dataset consisting of multiple tuples of type (K, (Iterable<V>, Iterable<W>)).

val list01 = List((1."a"), (1."a"), (2."b"), (3."e"))
val list02 = List((1."A"), (2."B"), (3."E"))
val list03 = List((1."[ab]"), (2."[bB]"), (3."eE"), (3."eE"))
sc.parallelize(list01).cogroup(sc.parallelize(list02),sc.parallelize(list03)).foreach(println)

// Output: Elements in the same RDD are grouped by key, and then elements in different RDD are grouped by key
(1,(CompactBuffer(a, a),CompactBuffer(A),CompactBuffer([ab])))
(3,(CompactBuffer(e),CompactBuffer(E),CompactBuffer(eE, eE)))
(2,(CompactBuffer(b),CompactBuffer(B),CompactBuffer([bB])))

Copy the code

1.15 cartesian

Calculate the Cartesian product:

val list1 = List("A"."B"."C")
val list2 = List(1.2.3)
sc.parallelize(list1).cartesian(sc.parallelize(list2)).foreach(println)

// Outputs the Cartesian product
(A,1)
(A,2)
(A,3)
(B,1)
(B,2)
(B,3)
(C,1)
(C,2)
(C,3)
Copy the code

1.16 aggregateByKey

Returns the data set of (K, U) pairs, which aggregate the value of each key using the given combinatorial function and zeroValue, when calling the data set of (K, V) pairs. Similar to groupByKey, the number of Reduce jobs can be configured using the second parameter numPartitions. The following is an example:

// For clarity, all the following parameters use named pass-throughs
val list = List(("hadoop".3), ("hadoop".2), ("spark".4), ("spark".3), ("storm".6), ("storm".8))
sc.parallelize(list,numSlices = 2).aggregateByKey(zeroValue = 0,numPartitions = 3)(
      seqOp = math.max(_, _),
      combOp = _ + _
    ).collect.foreach(println)
// Output result:
(hadoop,3)
(storm,8)
(spark,7)
Copy the code

NumSlices = 2 is used to specify the number of partitions of aggregateByKey parent operation Parallelize as 2, which is executed as follows:

Based on the same execution flow, if numSlices = 1, which means that only one partition is entered, the last combOp step is pretty much invalid, and the execution result is:

(hadoop,3)
(storm,8)
(spark,4)
Copy the code

Similarly, if each word is assigned to one partition, numSlices = 6, this is equivalent to summing, and the result is:

(hadoop,5)
(storm,14)
(spark,7)
Copy the code

AggregateByKey (zeroValue = 0,numPartitions = 3) the second parameter,numPartitions, determines the number of partitions to output RDD. To verify this, you can modify the code above. Get the number of partitions using the getNumPartitions method:

sc.parallelize(list,numSlices = 6).aggregateByKey(zeroValue = 0,numPartitions = 3)(
  seqOp = math.max(_, _),
  combOp = _ + _
).getNumPartitions
Copy the code

Second, the Action

The common Action operators of Spark are as follows:

Action Meaning (Meaning)
reduce(func) Using the functionfuncPerform the reduction operation
collect(a) Returns all the elements of a dataset as an array, suitable for small result sets.
count(a) Returns the number of elements in the dataset.
first(a) Returns the first element in the dataset, equivalent to take(1).
take(n) Put the front of the data setnThe element is returned as an array array.
takeSample(withReplacement.num[seed]) Random sampling of a dataset
takeOrdered(n.[ordering]) Return to before after sorting by natural Order or custom ComparatornAn element. Works only for small result sets, because all data is loaded into the driver’s memory for sorting.
saveAsTextFile(path) Write the elements in the dataset as text files to the local file system, HDFS, or other file systems supported by Hadoop. Spark calls the toString method on each element, converting the element to a single line in a text file.
saveAsSequenceFile(path) Write the elements in the dataset to the local file system, HDFS, or other file systems supported by Hadoop as Hadoop SequenceFile. This operation requires that elements in the RDD implement the Writable interface of Hadoop. For the Scala language, it automatically implicitly converts basic data types in Spark to corresponding Writable types. (Currently Java and Scala only)
saveAsObjectFile(path) Java serialized storage can be usedSparkContext.objectFile()Load. (Currently Java and Scala only)
countByKey(a) Count the number of occurrences of each key.
foreach(func) Iterate over each element in the RDD and execute on itfunfunction

2.1 the reduce

Use func to perform reduction:

 val list = List(1.2.3.4.5)
sc.parallelize(list).reduce((x, y) => x + y)
sc.parallelize(list).reduce(_ + _)

/ / output 15
Copy the code

2.2 takeOrdered

Returns the first n elements sorted by natural Order or a custom comparator. Note that takeOrdered uses implicit arguments for implicit conversions, and the source code is shown below. Therefore, when using custom Ordering, you need to implement the custom comparator and introduce it as an implicit parameter.

def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T] = withScope {
  .........
}
Copy the code

Custom rule sort:

// this can be done without Ordering[T]. Class CustomOrdering extends Ordering[(Int, String)] {Override def compare(x: (Int, String), y: (Int, String)): Int = if (x._2.length > y._2.length) 1 else -1 } val list = List((1, "hadoop"), (1, "storm"), (1, "azkaban"), (1, Implicit val implicitOrdering = new CustomOrdering sc.parallelize(list). TakeOrdered (5)  Array((1,hive), (1,storm), (1,hadoop), (1,azkaban)Copy the code

2.3 countByKey

Count the number of occurrences of each key:

val list = List(("hadoop".10), ("hadoop".10), ("storm".3), ("storm".3), ("azkaban".1))
sc.parallelize(list).countByKey()

// Map(hadoop -> 2, storm -> 2, azkaban -> 1)
Copy the code

2.4 saveAsTextFile

Write the elements in the dataset as text files to the local file system, HDFS, or other file systems supported by Hadoop. Spark calls the toString method on each element, converting the element to a single line in a text file.

val list = List(("hadoop".10), ("hadoop".10), ("storm".3), ("storm".3), ("azkaban".1))
sc.parallelize(list).saveAsTextFile("/usr/file/temp")
Copy the code

The resources

RDD Programming Guide

See the GitHub Open Source Project: Getting Started with Big Data for more articles in the big Data series