This is the 10th day of my participation in Gwen Challenge
The most bald is to see the source code
The environment
- The spark version used this time is 3.0.0
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>The spark - core_2. 12</artifactId>
<version>3.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>The spark - yarn_2. 12</artifactId>
<version>3.0.0</version>
<scope>provided</scope>
</dependency>
Copy the code
This is the script that we submit the task
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ . / examples/jars/spark - examples_2. 12-3.0.0. JarCopy the code
2. Let’s seespark-submit
Script code
if [ -z "${SPARK_HOME}" ]; then
source "$(dirname "$0")"/find-spark-home
fi
# disable randomized hash for string in Python 3.3+
export PYTHONHASHSEED=0
# Key code
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
Copy the code
It can be seen that Script that executes a spark – class script, parameters for: org. Apache. Spark. Deploy. SparkSubmit “$@”
3. Follow upspark-class
Script code
# Below is some source code
# Find the last code to see the value of CMD executed
CMD=("${CMD[@]:0:$LAST}")
exec "${CMD[@]}"
Where does CMD come from?
# The following code is the assembly parameter
CMD=()
DELIM=$'\n'
CMD_START_FLAG="false"
while IFS= read -d "$DELIM" -r ARG; do
if [ "$CMD_START_FLAG"= ="true" ]; then
CMD+=("$ARG")
else
if [ "$ARG"= = $'\ 0' ]; then
# After NULL character is consumed, change the delimiter and consume command string.
DELIM=' '
CMD_START_FLAG="true"
elif [ "$ARG"! ="" ]; then
echo "$ARG"
fi
fi
done < <(build_command "$@")
# the build_command method is called
If you're familiar with # -xmx128m, this is spliced into a Java startup command
# by running the class org. Apache. Spark. The launcher. The Main stitching good startup command
build_command() {
"$RUNNER" -Xmx128m $SPARK_LAUNCHER_OPTS -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
printf "%d\0" $?
}
Exec "${CMD[@]}
Copy the code
Org. Apache. Spark. The launcher. The Main piles of SAO, generates the start command, command runs org. Apache. Spark. Deploy. SparkSubmit
4. Nextorg.apache.spark.deploy.SparkSubmit
- So let’s first find the main method
override def main(args: Array[String) :Unit = {
// An anonymous inner class is declared
val submit = new SparkSubmit() {... }// Invoke anonymous inner class methods
submit.doSubmit(args)
}
Copy the code
- Then look at
SparkSubmit
What’s going on in the anonymous inner class- Rewrite parseArguments to be called when the parent class doSubmit is called
- When doSubmit is called, the parent class’s doSubmit is called
val submit = new SparkSubmit() {
self =>
// Here we rewrite parseArguments to be called when the parent class doSubmit is called
override protected def parseArguments(args: Array[String) :SparkSubmitArguments = {
new SparkSubmitArguments(args) {
override protected def logInfo(msg: => String) :Unit = self.logInfo(msg)
override protected def logWarning(msg: => String) :Unit = self.logWarning(msg)
override protected def logError(msg: => String) :Unit = self.logError(msg)
}
}
override protected def logInfo(msg: => String) :Unit = printMessage(msg)
override protected def logWarning(msg: => String) :Unit = printMessage(s"Warning: $msg")
override protected def logError(msg: => String) :Unit = printMessage(s"Error: $msg")
// when doSubmit is called, the parent class's doSubmit is called
override def doSubmit(args: Array[String) :Unit = {
try {
super.doSubmit(args)
} catch {
case e: SparkUserAppException =>
exitFn(e.exitCode)
}
}
}
Copy the code
def doSubmit(args: Array[String) :Unit = {
val uninitLog = initializeLogIfNecessary(true, silent = true)
// parseArguments calls the methods overriding the anonymous inner class above
val appArgs = parseArguments(args)
if (appArgs.verbose) {
logInfo(appArgs.toString)
}
appArgs.action match {
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
Copy the code
parseArguments
What did the method do? It creates aSparkSubmitArguments
Used to parse parameters- Take a look at
SparkSubmitArguments
Key source
// Since Scala sweeps through all the code in a class, there are hundreds of lines
// Go straight to line 108
parse(args.asJava)
// This method mainly resolves arguments such as --class --master etc
// Handle is the key, the other is parsing string etc
protected def parse(args: util.List[String) :Unit = {
val eqSeparatedOpt = Pattern.compile("(-- [^ =] +) = (. +)")
var idx = 0
idx = 0
while ( {
idx < args.size
}) {
var arg = args.get(idx)
var value = null
val m = eqSeparatedOpt.matcher(arg)
if (m.matches) {
arg = m.group(1)
value = m.group(2)}var name = findCliOption(arg, opts)
if(name ! =null) {
if (value == null) {
if (idx == args.size - 1) throw new IllegalArgumentException(String.format("Missing argument for option '%s'.", arg))
idx += 1
value = args.get(idx)
}
// The key method is handle
if(! handle(name, value))break
continue
}
name = findCliOption(arg, switches)
if(name ! =null) {
if(! handle(name,null)) break
continue
}
if(! handleUnknown(arg))break
idx += 1
}
if (idx < args.size) idx += 1
handleExtraArgs(args.subList(idx, args.size))
}
// The handle method is defined by the parent class and overridden by 'SparkSubmitArguments'
// Parent code
protected boolean handle(String opt, String value) {
// If you want to throw an exception directly, you need to subclass it
throw new UnsupportedOperationException(a); }// Subclass code is in 'SparkSubmitArguments'
override protected def handle(opt: String, value: String) :Boolean = {
opt match {
case NAME =>
name = value
//--master
case MASTER =>
master = value
//--class
case CLASS =>
mainClass = value
case DEPLOY_MODE= >if(value ! ="client"&& value ! ="cluster") {
error("--deploy-mode must be either \"client\" or \"cluster\"")
}
deployMode = value
// Omit some code here, many parameters, specific can see by yourself
case _ =>
error(s"Unexpected argument '$opt'.") } action ! =SparkSubmitAction.PRINT_VERSION
}
Copy the code
- Let’s continue with the unfinished doSubmit
def doSubmit(args: Array[String) :Unit = {
val uninitLog = initializeLogIfNecessary(true, silent = true)
// This explains what parseArguments do
val appArgs = parseArguments(args)
if (appArgs.verbose) {
logInfo(appArgs.toString)
}
// Continue
// Pattern matching is used here
Action = Option(action).getorelse (SUBMIT);
// Can be specified with parameters
appArgs.action match {
// Submit
case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
case SparkSubmitAction.KILL => kill(appArgs)
case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
case SparkSubmitAction.PRINT_VERSION => printVersion()
}
}
Copy the code
5.submit(appArgs, uninitLog)
private def submit(args: SparkSubmitArguments, uninitLog: Boolean) :Unit = {
def doRunMain() :Unit = {
if(args.proxyUser ! =null) {
// we did not specify the argument, so we go else, code omitted... Check by yourself
} else {
runMain(args, uninitLog)
}
}
if (args.isStandaloneCluster && args.useRest) {
//Standalone submission
// code omitted...
} else {
// We use YARN here, so else
// The above doRunMain is called
doRunMain()
}
}
Copy the code
Enter the runMain method, save only the key code, all your own line view
private def runMain(args: SparkSubmitArguments, uninitLog: Boolean) :Unit = {
/ / after the following lines of code get childMainClass = "org. Apache. Spark. Deploy. Yarn. YarnClusterApplication"
val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
......
// Obtain the Spark operating environment to send to YARN
val loader = getSubmitClassLoader(sparkConf)
for (jar <- childClasspath) {
addJarToClasspath(jar, loader)
}
......
/ / here to get the org. Apache. Spark. Deploy. Yarn. YarnClusterApplication class
var mainClass: Class[_] = null
try {
mainClass = Utils.classForName(childMainClass)
} catch{... }// mainClass inherits SparkApplication
//YarnClusterApplication inherits SparkApplication
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
// Create the YarnClusterApplication object
mainClass.getConstructor().newInstance().asInstanceOf[SparkApplication]}else {
new JavaMainApplication(mainClass)
}
.........
try {
/ / start
app.start(childArgs.toArray, sparkConf)
} catch {
case t: Throwable= >throw findCause(t)
}
}
// This method has several hundred lines omitted
private[deploy] def prepareSubmitEnvironment(
args: SparkSubmitArguments,
conf: Option[HadoopConfiguration] = None)
: (Seq[String].Seq[String].SparkConf.String) = {...// Focus on YARN_CLUSTER_SUBMIT_CLASS
//private[deploy] val YARN_CLUSTER_SUBMIT_CLASS ="org.apache.spark.deploy.yarn.YarnClusterApplication"
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
// Take the constant YARN_CLUSTER_SUBMIT_CLASS
/ / available childMainClass = "org. Apache. Spark. Deploy. Yarn. YarnClusterApplication"
childMainClass = YARN_CLUSTER_SUBMIT_CLASS. }... (childArgs, childClasspath, sparkConf, childMainClass) }Copy the code
6. The followingYarnClusterApplication
的 start
methods
Liver is not moving, etc. Chapter 2….. soon