This is the first day of my participation in Gwen Challenge
1 RDD create
You can use the following methods to create an RDD on Spark:
- Create an RDD from a collection (memory)
- To create AN RDD from a collection, Spark provides two main methods: Parallelize and makeRDD
- Parallelize is called by the underlying makeRDD code, so both methods are the same
// Create RDD in memory
def main(args: Array[String) :Unit = {
val sc: SparkContext = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("Rdd-Mem"))val rdd1: RDD[Int] = sc.makeRDD(
List(1.2.4.5.6))val rdd2: RDD[Int] = sc.parallelize(
Array(1.2.3.4.5.6)
)
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
}
Copy the code
/ / makeRDD source code
def makeRDD[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
parallelize(seq, numSlices)
}
Copy the code
- Create an RDD from external storage (file)
- RDD created from data sets of external storage systems includes local file systems and all data sets supported by Hadoop, such as HDFS and HBase.
def main(args: Array[String) :Unit = {
val sc = new SparkContext(
new SparkConf()
.setMaster("local")
.setAppName("Rdd-File"))val rdd1: RDD[String] = sc.textFile("data")
//wholeTextFiles Tuple The first data is the full file Tuple and the second data is each row
val rdd2: RDD[(String.String)] = sc.wholeTextFiles("data/word*.txt")
rdd1.collect().foreach(println)
rdd2.collect().foreach(println)
}
Copy the code
- Created from another RDD
- After an RDD calculation is completed, a new RDD is generated. Please refer to the following sections for details
- Create RDD (new) directly
- The RDD is directly constructed in new mode, which is generally used by the Spark framework itself.
2. RDD parallelism and partitioning
By default, Spark splits a job into multiple tasks and sends them to Executor nodes for parallel computation. The number of tasks that can be evaluated in parallel is called parallelism. This amount can be specified when building the RDD. Remember, the number of parallel tasks is not the number of shard tasks, so don’t get confused.
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("spark")
val sc = new SparkContext(sparkConf)
val dataRDD: RDD[Int] =
sc.makeRDD(
List(1.2.3.4),
4)
val fileRDD: RDD[String] =
sc.textFile(
"input".2)
fileRDD.collect().foreach(println)
sparkContext.stop()
Copy the code
- When reading memory data, the data can be partitioned according to the set parallelism. The core source code of Spark is as follows:
def slice[T: ClassTag](seq: Seq[T], numSlices: Int) :Seq[Seq[T]] = {
if (numSlices < 1) {
throw new IllegalArgumentException("Positive number of partitions required")}// Sequences need to be sliced at the same set of index positions for operations
// like RDD.zip() to behave as expected
// Calculate the start and end positions of each partition
//[1,2,3,4,5] splits into two partitions to become [1,2][3,4,5]
def positions(length: Long, numSlices: Int) :Iterator[(Int.Int)] = {(0 until numSlices).iterator.map { i =>
val start = ((i * length) / numSlices).toInt
val end = (((i + 1) * length) / numSlices).toInt
(start, end)
}
}
// The following is the specific split code
seq match {
case r: Range =>
positions(r.length, numSlices).zipWithIndex.map { case ((start, end), index) =>
// If the range is inclusive, use inclusive range for the last slice
if (r.isInclusive && index == numSlices - 1) {
new Range.Inclusive(r.start + start * r.step, r.end, r.step)
}
else {
new Range(r.start + start * r.step, r.start + end * r.step, r.step)
}
}.toSeq.asInstanceOf[Seq[Seq[T]]]
case nr: NumericRange[_] = >// For ranges of Long, Double, BigInteger, etc
val slices = new ArrayBuffer[Seq[T]](numSlices)
var r = nr
for ((start, end) <- positions(nr.length, numSlices)) {
val sliceSize = end - start
slices += r.take(sliceSize).asInstanceOf[Seq[T]]
r = r.drop(sliceSize)
}
slices
case_ = >val array = seq.toArray // To prevent O(n^2) operations for List etc
positions(array.length, numSlices).map { case (start, end) =>
array.slice(start, end).toSeq
}.toSeq
}
}
Copy the code
- When reading file data, data is sliced and partitioned according to Hadoop file reading rules. The slicing rules are somewhat different from data reading rules. The Spark core source code is as follows
override def getPartitions: Array[Partition] = {
val jobConf = getJobConf()
SparkHadoopUtil.get.addCredentials(jobConf)
try {
/ / partition
val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
..........
Copy the code
// How to partition
public InputSplit[] getSplits(JobConf job, int numSplits)
throws IOException {
long totalSize = 0;
for (FileStatus file: files) {
if (file.isDirectory()) {
throw new IOException("Not a file: "+ file.getPath());
}
totalSize += file.getLen();
}
long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
FileInputFormat.SPLIT_MINSIZE.1), minSplitSize); .for (FileStatus file: files) {
...
if(isSplitable(fs, path)) { long blockSize = file.getBlockSize(); long splitSize = computeSplitSize(goalSize, minSize, blockSize); . }protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
Copy the code
3. The RDD serialization
- Closure to check
Computationally, all code outside the operator is executed on the Driver side, and all code inside the operator is executed on the Executor side. In Scala functional programming, data outside the operator is often used within the operator, creating the effect of a closure. If the data outside the operator is not serialized, it means that the value cannot be passed to the Executor side and an error occurs. Therefore, before performing a task calculation, Checking whether an object inside a closure can be serialized is called closure detection. The way closures are compiled has changed since Scala2.12
- Serialization methods and properties
Computation-wise, all code outside the operator is executed on the Driver side, and all code inside the operator is executed on the Executor side as follows:
def main(args: Array[String) :Unit = {
val sc = new SparkContext(
new SparkConf().setMaster("local[*]").setAppName("Test serialization"))val dept1 = new Dept(1."R&d Department")
val dept0 = new Dept(0."Unknown")
val rdd = sc.makeRDD(List(("a".1), ("a".2), ("b".3), ("b".1),
("b".4), ("F".5), ("K".6)
))
rdd.map(t => {
t._2 match {
case 1 => (t._1, dept1)
case _ => (t._1, dept0)
}
}).collect() foreach println
}
class Dept(var id: Int, var name: String) extends Serializable {
override def toString: String = id + "\t" + name
}
Copy the code
// Check the code
private def clean(
func: AnyRef,
checkSerializable: Boolean,
cleanTransitively: Boolean,
accessedFields: Map[Class[_].Set[String]]) :Unit= {...// Verify serialization
if (checkSerializable) {
ensureSerializable(func)
}
}
private def ensureSerializable(func: AnyRef) :Unit = {
try {
if (SparkEnv.get ! =null) {
SparkEnv.get.closureSerializer.newInstance().serialize(func)
}
} catch {
case ex: Exception= >throw new SparkException("Task not serializable", ex)
}
}
// The interface that does not implement the serial number will run the following exception
//Exception in thread "main" org.apache.spark.SparkException: Task not serializable
// at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416)
// at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406)
// at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162)
Copy the code
- Kryo serialization framework
Project address: github.com/EsotericSof… Java serialization can serialize any class. However, it is heavy (multi-byte), and after serialization, the submission of the object is large. Spark For performance reasons, Spark2.0 has started to support another Kryo serialization mechanism. Kryo is 10 times faster than Serializable. While RDD is in Shuffle data, simple data types, arrays, and strings are already serialized inside Spark using Kryo note: Even with Kryo serialization, Serializable interface is inherited.
def main(args: Array[String) :Unit = {
val sc = new SparkContext(
new SparkConf()
.setMaster("local[*]")
.setAppName("Test serialization")
.set("spark.serializer"."org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(Array(classOf[Dept)))val dept1 = new Dept(1."R&d Department")
val dept0 = new Dept(0."Unknown")
val rdd = sc.makeRDD(List(("a".1), ("a".2), ("b".3), ("b".1),
("b".4), ("F".5), ("K".6)
))
rdd.map(t => {
t._2 match {
case 1 => (t._1, dept1)
case _ => (t._1, dept0)
}
}).collect() foreach println
}
class Dept(var id: Int, var name: String) extends Serializable {
override def toString: String = id + "\t" + name
}
Copy the code