This is the 10th day of my participation in Gwen Challenge

The most bald is to see the source code

The environment

  • The spark version used this time is 3.0.0
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>The spark - core_2. 12</artifactId>
    <version>3.0.0</version>
</dependency>
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>The spark - yarn_2. 12</artifactId>
    <version>3.0.0</version>
    <scope>provided</scope>
</dependency>
Copy the code

This is the script that we submit the task

bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ . / examples/jars/spark - examples_2. 12-3.0.0. JarCopy the code

2. Let’s seespark-submitScript code

if [ -z "${SPARK_HOME}" ]; then
  source "$(dirname "$0")"/find-spark-home
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
# Key code
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
Copy the code

It can be seen that Script that executes a spark – class script, parameters for: org. Apache. Spark. Deploy. SparkSubmit “$@”

3. Follow upspark-classScript code

# Below is some source code

# Find the last code to see the value of CMD executed
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"

Where does CMD come from?
# The following code is the assembly parameter
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
while IFS= read -d "$DELIM" -r ARG; do
  if [ "$CMD_START_FLAG"= ="true" ]; then
    CMD+=("$ARG")
  else
    if [ "$ARG"= = $'\ 0' ]; then
      # After NULL character is consumed, change the delimiter and consume command string.
      DELIM=' '
      CMD_START_FLAG="true"
    elif [ "$ARG"! ="" ]; then
      echo "$ARG"
    fi
  fi
done < <(build_command "$@")
# the build_command method is called

If you're familiar with # -xmx128m, this is spliced into a Java startup command
# by running the class org. Apache. Spark. The launcher. The Main stitching good startup command
build_command() {
  "$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
  printf "%d\0" $?
}

Exec "${CMD[@]}

Copy the code

Org. Apache. Spark. The launcher. The Main piles of SAO, generates the start command, command runs org. Apache. Spark. Deploy. SparkSubmit

4. Nextorg.apache.spark.deploy.SparkSubmit

  • So let’s first find the main method
  override def main(args: Array[String) :Unit = {
    // An anonymous inner class is declared
    val submit = new SparkSubmit() {... }// Invoke anonymous inner class methods
    submit.doSubmit(args)
  }
Copy the code
  • Then look atSparkSubmitWhat’s going on in the anonymous inner class
    • Rewrite parseArguments to be called when the parent class doSubmit is called
    • When doSubmit is called, the parent class’s doSubmit is called
val submit = new SparkSubmit() {
    self =>
    // Here we rewrite parseArguments to be called when the parent class doSubmit is called
    override protected def parseArguments(args: Array[String) :SparkSubmitArguments = {
        new SparkSubmitArguments(args) {
            override protected def logInfo(msg: => String) :Unit = self.logInfo(msg)
            override protected def logWarning(msg: => String) :Unit = self.logWarning(msg)
            override protected def logError(msg: => String) :Unit = self.logError(msg)
        }
    }
    override protected def logInfo(msg: => String) :Unit = printMessage(msg)
    override protected def logWarning(msg: => String) :Unit = printMessage(s"Warning: $msg")
    override protected def logError(msg: => String) :Unit = printMessage(s"Error: $msg")
    // when doSubmit is called, the parent class's doSubmit is called
    override def doSubmit(args: Array[String) :Unit = {
        try {
            super.doSubmit(args)
        } catch {
            case e: SparkUserAppException =>
                exitFn(e.exitCode)
        }
    }
}
Copy the code
  def doSubmit(args: Array[String) :Unit = {
    val uninitLog = initializeLogIfNecessary(true, silent = true)
    // parseArguments calls the methods overriding the anonymous inner class above
    val appArgs = parseArguments(args)
    if (appArgs.verbose) {
      logInfo(appArgs.toString)
    }
    appArgs.action match {
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }
  }
Copy the code
  • parseArgumentsWhat did the method do? It creates aSparkSubmitArgumentsUsed to parse parameters
  • Take a look atSparkSubmitArgumentsKey source
// Since Scala sweeps through all the code in a class, there are hundreds of lines
// Go straight to line 108
  parse(args.asJava)
  
// This method mainly resolves arguments such as --class --master etc
// Handle is the key, the other is parsing string etc
protected def parse(args: util.List[String) :Unit = {
    val eqSeparatedOpt = Pattern.compile("(-- [^ =] +) = (. +)")
    var idx = 0
    idx = 0
    while ( {
        idx < args.size
    }) {
        var arg = args.get(idx)
        var value = null
        val m = eqSeparatedOpt.matcher(arg)
        if (m.matches) {
            arg = m.group(1)
            value = m.group(2)}var name = findCliOption(arg, opts)
        if(name ! =null) {
            if (value == null) {
                if (idx == args.size - 1) throw new IllegalArgumentException(String.format("Missing argument for option '%s'.", arg))
                idx += 1
                value = args.get(idx)
            }
            // The key method is handle
            if(! handle(name, value))break
            continue
        }
        name = findCliOption(arg, switches)
        if(name ! =null) {
            if(! handle(name,null)) break 
            continue
        }
        if(! handleUnknown(arg))break 

        idx += 1
    }
    if (idx < args.size) idx += 1
    handleExtraArgs(args.subList(idx, args.size))
}

// The handle method is defined by the parent class and overridden by 'SparkSubmitArguments'
// Parent code
protected boolean handle(String opt, String value) {
  // If you want to throw an exception directly, you need to subclass it
throw new UnsupportedOperationException(a); }// Subclass code is in 'SparkSubmitArguments'
override protected def handle(opt: String, value: String) :Boolean = {
opt match {
  case NAME =>
    name = value
    //--master
  case MASTER =>
    master = value
    //--class
  case CLASS =>
    mainClass = value

  case DEPLOY_MODE= >if(value ! ="client"&& value ! ="cluster") {
      error("--deploy-mode must be either \"client\" or \"cluster\"")
    }
    deployMode = value
  // Omit some code here, many parameters, specific can see by yourself
  case _ =>
    error(s"Unexpected argument '$opt'.") } action ! =SparkSubmitAction.PRINT_VERSION
}
  
Copy the code

  • Let’s continue with the unfinished doSubmit
  def doSubmit(args: Array[String) :Unit = {
    val uninitLog = initializeLogIfNecessary(true, silent = true)
    // This explains what parseArguments do
    val appArgs = parseArguments(args)
    if (appArgs.verbose) {
      logInfo(appArgs.toString)
    }
    // Continue
    // Pattern matching is used here
    Action = Option(action).getorelse (SUBMIT);
    // Can be specified with parameters
    appArgs.action match {
        // Submit
      case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
      case SparkSubmitAction.PRINT_VERSION => printVersion()
    }
  }
Copy the code

5.submit(appArgs, uninitLog)

  private def submit(args: SparkSubmitArguments, uninitLog: Boolean) :Unit = {

    def doRunMain() :Unit = {
      if(args.proxyUser ! =null) {
        // we did not specify the argument, so we go else, code omitted... Check by yourself
      } else {
        runMain(args, uninitLog)
      }
    }

    if (args.isStandaloneCluster && args.useRest) {
      //Standalone submission
      // code omitted...
    } else {
      // We use YARN here, so else
      // The above doRunMain is called
      doRunMain()
    }
  }
Copy the code

Enter the runMain method, save only the key code, all your own line view

private def runMain(args: SparkSubmitArguments, uninitLog: Boolean) :Unit = {
    / / after the following lines of code get childMainClass = "org. Apache. Spark. Deploy. Yarn. YarnClusterApplication"
    val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
    ......
    // Obtain the Spark operating environment to send to YARN
    val loader = getSubmitClassLoader(sparkConf)
    for (jar <- childClasspath) {
      addJarToClasspath(jar, loader)
    }
    ......
    / / here to get the org. Apache. Spark. Deploy. Yarn. YarnClusterApplication class
    var mainClass: Class[_] = null
    try {
      mainClass = Utils.classForName(childMainClass)
    } catch{... }// mainClass inherits SparkApplication
    //YarnClusterApplication inherits SparkApplication
    val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
      // Create the YarnClusterApplication object
      mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]}else {
      new JavaMainApplication(mainClass)
    }
    .........
    try {
      / / start
      app.start(childArgs.toArray, sparkConf)
    } catch {
      case t: Throwable= >throw findCause(t)
    }
}

// This method has several hundred lines omitted
private[deploy] def prepareSubmitEnvironment(
      args: SparkSubmitArguments,
      conf: Option[HadoopConfiguration] = None)
      : (Seq[String].Seq[String].SparkConf.String) = {...// Focus on YARN_CLUSTER_SUBMIT_CLASS
    //private[deploy] val YARN_CLUSTER_SUBMIT_CLASS ="org.apache.spark.deploy.yarn.YarnClusterApplication"

    // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
    if (isYarnCluster) {
    // Take the constant YARN_CLUSTER_SUBMIT_CLASS
    / / available childMainClass = "org. Apache. Spark. Deploy. Yarn. YarnClusterApplication"
      childMainClass = YARN_CLUSTER_SUBMIT_CLASS. }... (childArgs, childClasspath, sparkConf, childMainClass) }Copy the code

6. The followingYarnClusterApplicationstartmethods

Liver is not moving, etc. Chapter 2….. soon