1. Create RDD

1. Create an RDD from the collection

Spark provides two main functions: Parallelize and makeRDD

//1. Create the RDD using parallelize()
val rdd: RDD[Int] = sc.parallelize(Array(1.2.3.4.5.6.7.8))
println("Partition number:" + rdd.partitions.size)
//2. Use makeRDD() to create an RDD
val rdd1: RDD[Int] = sc.makeRDD(Array(1.2.3.4.5.6.7.8))
Copy the code

2. Create data from an external system data set

Local file systems, and all data sets supported by Hadoop, such as HDFS,HBase, etc.

// Read the file. If the path is cluster: HDFS: / / hadoop102:9000 / input
val lineWordRdd: RDD[String] = sc.textFile("input")
Copy the code

3. Create the RDD from another RDD

After an RDD calculation is completed, a new RDD is generated.

2. Partition rules

1. Default partition source (RDD created from collection)

Conclusion:

  • If the number of partitions is specified, partitions are made according to the specified number (numSlices, looking at the source code, are the number of slices that can be specified, CPU cores are not specified)
  • If the number of partitions is not specified, set the partition number based on the number of CPU cores

2. Create partition rules for the RDD from external files

Conclusion: math.min(depends on the number of CPU cores allocated to the application,2)

The source code is as follows:

def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
}
TextInputFormat:LineRecordReaderMinPartitions: the minimum number of minPartitions, if not specified =====>def defaultMinPartitions: Int = math.min(defaultParallelism, 2) = = = = = >override def defaultParallelism() :Int =
  scheduler.conf.getInt("spark.default.parallelism", totalCores)
Copy the code

Slice planning:Note that the getSplits file returns the slice plan, which is actually read from the LineRecordReader created in the Compute method and has two key variables

start=split.getStart()	 
end = start + split.getLength
Copy the code

3. Create the RDD from another RDD

The number of partitions that generally follow the previous RDD

3. Detailed source code of slicing rules (follow-up supplement)

1. Read files to create RDD and analyze partition slices

  1. TextFile method:
  • Math.min(totalCores,2) minPartitions: specifies the minimum number of minPartitions, math.min (totalCores,2)
//1. TextFile, read the text method
def textFile(
    path: String,
    minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
  assertNotStopped()
  hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
    minPartitions).map(pair => pair._2.toString).setName(path)
}

//2. hadoopFile
def hadoopFile[K.V](
    path: String,
    inputFormatClass: Class[_ < :InputFormat[K.V]],
    keyClass: Class[K],
    valueClass: Class[V],
    minPartitions: Int = defaultMinPartitions): RDD[(K.V)] = withScope {
  assertNotStopped()

  // This is a hack to enforce loading hdfs-site.xml.
  // See SPARK-11227 for details.
  FileSystem.getLocal(hadoopConfiguration)

  // A Hadoop configuration can be about 10 KB, which is pretty big, so broadcast it.
  val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))
  val setInputPathsFunc = (jobConf: JobConf) = >FileInputFormat.setInputPaths(jobConf, path)
  new HadoopRDD(
    this,
    confBroadcast,
    Some(setInputPathsFunc),
    inputFormatClass,
    keyClass,
    valueClass,
    minPartitions).setName(path)
}

//3. The HadoopRDD operator handles text
 new HadoopRDD(
    this,
    confBroadcast,
    Some(setInputPathsFunc),
    inputFormatClass,
    keyClass,
    valueClass,
    minPartitions)
Copy the code
  1. TextInputFormat: Text file, slicing logic is in the parent class FileInputFormat
/** * An {@link InputFormat} for plain text files. Files are broken into lines. * Either linefeed or carriage-return are  used to signal end of line. Keys are * the position in the file, and values are the line of text.. * /
@InterfaceAudience.Public
@InterfaceStability.Stable
public class TextInputFormat extends FileInputFormat<LongWritable.Text>
Copy the code
  1. Get partition information, slice information is also in the partition object HadoopPartition
//1. Obtain the partition number
override def getPartitions: Array[Partition] = {
  val jobConf = getJobConf()
  // add the credentials here as this can be called before SparkContext initialized
  SparkHadoopUtil.get.addCredentials(jobConf)
  val inputFormat = getInputFormat(jobConf)
  // Get the slice plan
  val inputSplits = inputFormat.getSplits(jobConf, minPartitions)
  val array = new Array[Partition](inputSplits.size)
  // Partition number = slice number
  for (i <- 0 until inputSplits.size) {
    array(i) = new HadoopPartition(id, i, inputSplits(i))
  }
  array
}

// Slice the object
public FileSplit(Path file, long start, long length, String[] hosts) {
    this.file = file;	// Specifies the file path
    this.start = start;  // Slice the starting position of the read file
    this.length = length; // Read the length
    this.hosts = hosts; // The server where the file resides
  }
 
Copy the code
  1. Specific slice planning
  • Key methods for calculating slice size:

    Math.max(minSize, Math.min(goalSize, blockSize));

    1. When the configured minimum slice size is greater than the file block size, the slice size is the configured minSize
    2. Otherwise the slice size is the target file size and the block file size, whichever is smaller, as long as it is greater than 1

MinSize: indicates the minimum slice size, which can be specified in the configuration file. If this parameter is not specified, the default value is 1

GoalSize: Target slice size = total file size/minimum partition number

BlockSize: specifies the blockSize of HDFS files

  • Key code for file planning:

The last slice plan may be larger than the slice size, but not more than 1.1 times the slice size

/ / SPLIT_SLOP 1.1
while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
  String[] splitHosts = getSplitHosts(blkLocations,
      length-bytesRemaining, splitSize, clusterMap);
  splits.add(makeSplit(path, length-bytesRemaining, splitSize,
      splitHosts));
  bytesRemaining -= splitSize;
}

if(bytesRemaining ! =0) {
  String[] splitHosts = getSplitHosts(blkLocations, length
      - bytesRemaining, bytesRemaining, clusterMap);
  splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
      splitHosts));
}
Copy the code

Complete code:

//4. The Hadoop getSplits were calculated and planned
  /** * Splits files returned by {@link #listStatus(JobConf)} when * they're too big.*/ 
  public InputSplit[] getSplits(JobConf job, int numSplits)
    throws IOException {
    FileStatus[] files = listStatus(job);
    
    // Save the number of input files for metrics/loadgen
    job.setLong(NUM_INPUT_FILES, files.length);
    long totalSize = 0;                           // compute total size
    for (FileStatus file: files) {                // check we have valid files
      if (file.isDirectory()) {
        throw new IOException("Not a file: "+ file.getPath());
      }
      totalSize += file.getLen();
    }
	// Target slice size = total file size/minimum partition number
    long goalSize = totalSize / (numSplits == 0 ? 1 : numSplits);
    // Minimum slice size, default is 1
    long minSize = Math.max(job.getLong(org.apache.hadoop.mapreduce.lib.input.
      FileInputFormat.SPLIT_MINSIZE.1), minSplitSize);

    // generate splits
    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
    NetworkTopology clusterMap = new NetworkTopology(a);for (FileStatus file: files) {
      Path path = file.getPath();
      long length = file.getLen();
      if(length ! =0) {
        FileSystem fs = path.getFileSystem(job);
        BlockLocation[] blkLocations;
        if (file instanceof LocatedFileStatus) {
          blkLocations = ((LocatedFileStatus) file).getBlockLocations();
        } else {
          blkLocations = fs.getFileBlockLocations(file, 0, length);
        }
        if (isSplitable(fs, path)) {
        // File block size
          long blockSize = file.getBlockSize();
       //computeSplitSize: specifies the logic used to calculate the slice size
       //Math.max(minSize, Math.min(goalSize, blockSize));
          long splitSize = computeSplitSize(goalSize, minSize, blockSize);

          long bytesRemaining = length;
          // Generate an array of slice planning files
          while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
            String[] splitHosts = getSplitHosts(blkLocations,
                length-bytesRemaining, splitSize, clusterMap);
            splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                splitHosts));
            bytesRemaining -= splitSize;
          }

          if(bytesRemaining ! =0) {
            String[] splitHosts = getSplitHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap); splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts)); }}else {
          String[] splitHosts = getSplitHosts(blkLocations,0,length,clusterMap);
          splits.add(makeSplit(path, 0, length, splitHosts)); }}else { 
        //Create empty hosts array for zero length files
        splits.add(makeSplit(path, 0, length, new String[0])); }}LOG.debug("Total # of splits: " + splits.size());
    return splits.toArray(new FileSplit[splits.size()]);
  }
  
  // 5. Compute reads the record
  override def compute(theSplit: Partition, context: TaskContext) :InterruptibleIterator[(K.V)] = {
    val iter = new NextIterator[(K.V)] {

      private val split = theSplit.asInstanceOf[HadoopPartition]
      logInfo("Input split: " + split.inputSplit)
      private val jobConf = getJobConf()

      private val inputMetrics = context.taskMetrics().inputMetrics
      private val existingBytesRead = inputMetrics.bytesRead

      // Sets the thread local variable for the file's name
      split.inputSplit.value match {
        case fs: FileSplit= >InputFileNameHolder.setInputFileName(fs.getPath.toString)
        case_ = >InputFileNameHolder.unsetInputFileName()
      }

      // Find a function that will return the FileSystem bytes read by this thread. Do this before
      // creating RecordReader, because RecordReader's constructor might read some bytes
      private val getBytesReadCallback: Option[() = >Long] = split.inputSplit.value match {
        case_ :FileSplit| _ :CombineFileSplit= >SparkHadoopUtil.get.getFSBytesReadOnThreadCallback()
        case_ = >None
      }

      // For Hadoop 2.5+, we get our input bytes from thread-local Hadoop FileSystem statistics.
      // If we do a coalesce, however, we are likely to compute multiple partitions in the same
      // task and in the same thread, in which case we need to avoid override values written by
      // previous partitions (SPARK-13071).
      private def updateBytesRead() :Unit = {
        getBytesReadCallback.foreach { getBytesRead =>
          inputMetrics.setBytesRead(existingBytesRead + getBytesRead())
        }
      }

      private var reader: RecordReader[K.V] = null
      private val inputFormat = getInputFormat(jobConf)
      HadoopRDD.addLocalConfiguration(
        new SimpleDateFormat("yyyyMMddHHmmss".Locale.US).format(createTime),
        context.stageId, theSplit.index, context.attemptNumber, jobConf)

      reader =
        try {
          inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL)}catch {
          case e: IOException if ignoreCorruptFiles =>
            logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
            finished = true
            null
        }
      // Register an on-task-completion callback to close the input stream.
      context.addTaskCompletionListener{ context => closeIfNeeded() }
      private val key: K = if (reader == null) null.asInstanceOf[K] else reader.createKey()
      private val value: V = if (reader == null) null.asInstanceOf[V] else reader.createValue()

      override def getNext() : (K.V) = {
        try{ finished = ! reader.next(key, value) }catch {
          case e: IOException if ignoreCorruptFiles =>
            logWarning(s"Skipped the rest content in the corrupted file: ${split.inputSplit}", e)
            finished = true
        }
        if(! finished) { inputMetrics.incRecordsRead(1)}if (inputMetrics.recordsRead % SparkHadoopUtil.UPDATE_INPUT_METRICS_INTERVAL_RECORDS= =0) {
          updateBytesRead()
        }
        (key, value)
      }

      override def close() {
        if(reader ! =null) {
          InputFileNameHolder.unsetInputFileName()
          // Close the reader and release it. Note: it's very important that we don't close the
          // reader more than once, since that exposes us to MAPREDUCE-5918 when running against
          // Hadoop 1.x and older Hadoop 2.x releases. That bug can lead to non-deterministic
          // corruption issues when reading compressed input.
          try {
            reader.close()
          } catch {
            case e: Exception= >if (!ShutdownHookManager.inShutdown()) {
                logWarning("Exception in RecordReader.close()", e)
              }
          } finally {
            reader = null
          }
          if (getBytesReadCallback.isDefined) {
            updateBytesRead()
          } else if (split.inputSplit.value.isInstanceOf[FileSplit] ||
                     split.inputSplit.value.isInstanceOf[CombineFileSplit]) {
            // If we can't get the bytes read from the FS stats, fall back to the split size,
            // which may be inaccurate.
            try {
              inputMetrics.incBytesRead(split.inputSplit.value.getLength)
            } catch {
              case e: java.io.IOException =>
                logWarning("Unable to get input size to set InputMetrics for task", e)
            }
          }
        }
      }
    }
    new InterruptibleIterator[(K.V)](context, iter)
  }
Copy the code