Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”.
This article describes the conversion operator of Spark computing engine in big data
Recommended reading time: 15-20 minutes
Value type operator
The map of operator
Function signatures
def map[U: ClassTag](f: T => U): RDD[U]
Copy the code
Function description
The processed data is mapped and transformed one by one, where the conversion can be a type conversion or a value conversion.
Demo code
val conf: SparkConf = new SparkConf().setAppName("test").setMaster("local[*]")
val sc = new SparkContext(conf)
val dataRDD: RDD[Int] = sc.makeRDD(List(1.2.3.4))
val dataRDD1: RDD[Int] = dataRDD.map(
num => {
num * 2
}
)
val dataRDD2: RDD[String] = dataRDD1.map(
num => {
"" + num
}
)
println(dataRDD1.collect().mkString("\t"))
println(dataRDD2.collect().mkString("\t"))
Copy the code
The first map is worth converting, with each value *2 in the original list
The second map is a type conversion, converting each worthwhile type in the list from int to String
The results
2, 4, 6, 8, 2, 4, 6, 8Copy the code
MapPartitions operator
Function signatures
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
Copy the code
Function description
MapPartitions process data by partition. Map processes data one at a time. This is recommended for large memory space
MapPartitions are processed one by one while mapPartitions are processed on a partitioned basis
Demo code
listRD.mapPartitions(datas= >{
datas.map(data= >data*2)})Copy the code
Advantages: mapPartitions operators are more efficient than MAP, reducing the number of interactions sent to the actuator (partition interactions, not every data interaction)
Disadvantages: Memory overflow (large amounts of data pouring into a partition)
MapPartitionsWithIndex operator
Function signatures
def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T]) => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
Copy the code
Function description
The data to be processed with the partition number is sent to the compute node for processing by partition. Processing here means that any processing can be performed, even filtering data, and the current partition index can be obtained during processing.
Demo code
val res: RDD[(Int, String)] = dataRDD.mapPartitionsWithIndex {
case (num, datas) => {
datas.map((_, "Partition No. :" + num))
}
}
println(res.collect().mkString("\n"))
val dataRDD1 = dataRDD.mapPartitionsWithIndex(
(index, datas) => {
datas.map(index, _)
}
)
Copy the code
The results
(1, Partition number: 0) (2, partition number: 0) (3, partition number: 1) (4, partition number: 1)Copy the code
FlatMap operator
Function signatures
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U]
Copy the code
Function description
Map: Get a new element (several elements or several elements)
Flatmap: Obtain one or more new elements (more than the original elements). Flatmap flattens the processed data before mapping, so the operator is also called flat mapping
Demo code
val lineArray = Array("hello you"."hello me"."hello world")
val lines = sc.parallelize(lineArray, 1)
val res1: RDD[Array[String]] = lines.map(_.split(""))
val res2: RDD[String] = lines.flatMap(_.split(""))
println("map:")
res1.foreach(
line=> println(line.mkString("\t"))
)
println()
println("flatMap:")
res2.foreach(
line=> println(line.mkString("\t")))Copy the code
The results
map:
hello you
hello me
hello world
flatMap:
h e l l o
y o u
h e l l o
m e
h e l l o
w o r l d
Copy the code
Glom operator
Function signatures
def glom(): RDD[Array[T]]
Copy the code
Function description
The data in the same partition is directly converted into the same type of memory array for processing. Partitions remain unchanged and are applicable to data statistics in the unit of partitions
Demo code
val rdd: RDD[Int] = sc.makeRDD(List(1.5.6.9.8.5.6.2.4.65.2),4)
val arrayrdd: RDD[Array[Int]] = rdd.glom()
arrayrdd.collect().foreach(array=>{
println(array.max)
})
}
Copy the code
GroupBy operator
Function signatures
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])]
Copy the code
Function description
Group by the return value of the passed function. Place the corresponding value of the same key into an iterator.
Demo code
val rdd: RDD[Int] = sc.makeRDD(List(1.5.6.9.8.5.6.2.4.65.2))
val grouprdd: RDD[(Int, Iterable[Int])] = rdd.groupBy(i => i%2)
grouprdd.foreach(println)
Copy the code
The filter operator
Function signatures
def filter(f: T => Boolean): RDD[T]
Copy the code
Function description
Data is filtered according to the specified rules. Data that meets the rules is retained and data that does not meet the rules is discarded. After data is filtered, partitions remain the same, but data in partitions may be unbalanced. In the production environment, data skew may occur.
Demo code
val dataRDD = sparkContext.makeRDD(List(
1.2.3.4
),1)
val dataRDD1 = dataRDD.filter(_%2= =0)
Copy the code
The sample of operator
Function signatures
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
Copy the code
Function description
Extract data from a data set according to the specified rules
Code demo
val dataRDD = sparkContext.makeRDD(List(
1.2.3.4
),1)
val dataRDD1 = dataRDD.sample(false.0.5)
val dataRDD2 = dataRDD.sample(true.2)
Copy the code
Extension:
Extract data without putting it back (Bernoulli algorithm)
Bernoulli algorithm: also known as 0, 1 distribution. Like flipping a coin, either heads or tails.
Specific implementation: according to the seed and random algorithm to calculate a number and the second parameter setting probability comparison, less than the second parameter to, greater than not
First parameter: whether to return extracted data. False: not to return extracted data
The second parameter: the probability of extraction, which ranges from [0,1],0: none; 1.
Third parameter: random number seed
Extract data and put it back (Poisson algorithm)
The first parameter: whether the extracted data should be put back, true: put back; False: Not put back
Second parameter: the probability of repeating data, with a range greater than or equal to 0. Indicates how many times each element is expected to be extracted
Third parameter: random number seed
Distinct operator
Function signatures
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
Copy the code
Function description
De-duplication of data is used directly in the RDD where de-duplication is required
val lineArray = Array("hello you"."hello me"."hello world"."hello me")
val lines = sc.parallelize(lineArray, 1)
val value: RDD[String] = lines.distinct()
Copy the code
Coalesce and repartition operators
Function signatures
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T]
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
Copy the code
Function description
Repartition actually invokes coalesce, which shuffles by default. The coalesce shuffle process is optional
Development:
1) If N<M.
Generally, N partitions have uneven data distribution. Use the HashPartitioner function to repartition data into M partitions. In this case, shuffle needs to be set to True (this function is implemented by Repartition but cannot be implemented by coalesce).
2) If N is greater than M and N and M are similar
In this case, set shuff to false (coalesce implementation). If M is greater than N, coalesce is invalid and shuffle is not performed. There is a narrow dependency between parent RDD and child RDD, and the partiton cannot be increased. If shuffle is false, the number of partitions in the RDD will remain the same if the parameter passed is greater than the number of existing partitions. In other words, the number of partitions in the RDD cannot be increased without shuffle
3) If N is greater than M and the two differ greatly
If the number of executors is less than the number of partitions to be generated, coalesce is efficient. Conversely, using coalesce will result in (executor number – Partiton number to be generated) empty excutor runs, reducing efficiency. If M is 1, shuffle can be set to true to ensure better parallelism of operations performed before coalesce.
Sortby operator
Function signatures
def sortBy[K](
f: (T) => K,
ascending: Boolean = true,
numPartitions: Int = this.partitions.length)
(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T]
Copy the code
Function description
This operation is used to sort data. Before sorting, the data can be processed by the F function, and then sorted by the result of the F function processing, which is in ascending order by default. The number of partitions in the new RDD is the same as the number of partitions in the original RDD. You can have a second argument if you want descending order, set it to false
Demo code
val dataRDD = sparkContext.makeRDD(List(
1.2.3.4.1.2
),2)
val dataRDD1 = dataRDD.sortBy(num= >num, false.4)
Copy the code
PartitionBy operator
Function signatures
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
Copy the code
Function description
The Partitioner repartitions the data to the specified Partitioner. The default Spark partitioner is the HashPartitioner
The HashPartitioner, which resizes the current key, is the default partition
The RangePartitioner, which requires data to be comparably sized and sorted, has low applicability,
Custom divider
Demo code
val listedd: RDD[(String.String)] = sc.makeRDD(
List(
("cba"."1"), ("nba"."2"), ("nba"."3"),
("cba"."4"), ("cba"."5"), ("wnba"."6")),1
)
val rdd1: RDD[(String.String)] = listedd.partitionBy(new MyPartition(2))
val rdd2: RDD[(Int, (String.String))] = rdd1.mapPartitionsWithIndex(
(index, datas) = >
datas.map(
data= > (index, data)
)
)
println(rdd2.collect().foreach(println))
sc.stop()
}
/ / inherit the Partitioner
class MyPartition(num: Int) extends Partitioner {
// Get the number of partitions
override def numPartitions: Int = num
// Determine the partition in which the data will be processed based on the data key
// The method returns a partition number (index)
override def getPartition(key: Any): Int = {
key match {
case "nba"= >0
case _= > 1}}}Copy the code
Note: If the repartitioned divider is the same as the current RDD divider, no processing is done and the partition will not be repartitioned
This is the end of this article, for the knowledge of this article, there will be a follow-up series, interested partners please continue to pay attention to!