1, the preface

We already know the Spark Client and Cluster commit mode processes from the previous article Spark Basics 06-Spark Client and Cluster Commit Process

  1. Start the Driver process and register the application with the cluster manager
  2. The cluster explorer assigns and starts Executor based on the task profile
  3. After Executor starts, it reverts to the Driver, which has obtained sufficient resources to run
  4. The Driver starts to execute the main function and Spark query is lazy execution. When the action operator is executed, the Driver starts to calculate backwards. Stages are divided according to the wide dependency. Tasks are distributed to the specified Executor for execution
  5. After all tasks of a stage are executed, the intermediate results are written into the local disk files of each node, and then the Driver will schedule the next stage to run. The input data of tasks of the next stage is the intermediate results of outputs of the previous stage.
  6. The Executor communicates with the Driver during execution to report the status of the task

The biggest difference between the two submission modes lies in the startup location of the Driver. In client mode, the Driver starts on the local machine, while in Cluster mode, the Driver starts on nodes in the cluster cluster

Let’s verify the submission process by tracing the source code

2, Submit shell

The spark task is submitted by calling ${SPARK_HOME}/bin/spark-submit.sh. An example is given below

Spark - submit - master spark: / / 127.0.0.1: \ 7077#Cluster pattern
--deploy-mode cluster \
--driver-memory 1g \
--executor-memory 1g \
--executor-cores 1 \
#Start the class
--class org.apache.spark.examples.SparkPi \
#The jar package address
${SPARK_HOME}/examples/jars/spark-examples.jar \
Copy the code

To see the spark – submit. Sh scripts, we can see that by calling the org. Apache. Spark. Deploy. SparkSubmit, and our parameters into the class

3, SparkSubmit Submission process 01-SparkSubmit

This article uses spark source version 2.3.4, enter SparkSubmit class, we go straight to the topic, find SparkSubmit. Main () method

The main() method wraps the input arguments as SparkSubmitArguments(), and then executes the corresponding methods depending on the type of the argument. Here we’ll focus on tracking the submission process

If we go to sparksubmit.submit (), we see that doRunMain() is defined, and then we go to judgment, and we see that whatever branch we go to ends up calling doRunMain()

Entering the doRunMain() method, you can see that there are two branches, one that executes the runMain() method using the proxy and one that executes runMain() directly

Go to Sparksubmit.runMain () and focus on the comment, “Run the subclass’s main method with submit arguments,” in two steps

  1. Prepare the runtime environment, set the appropriate classpath, system implementation, and application parameters to run the Child Main Class, based on cluster manager and deployment mode
  2. Use the startup environment to call the main Method of the Child main class

As you can see from the comments, in addition to preparing the environment for runMain, the only thing we need to focus on is the deployment Mode and the property childMainClass

The prepareSubmitEnvironment(ARgs) method returns a tuple4 with the childMainClass property we need to focus on

Enters SparkSubmit. PrepareSubmitEnvironment () method, you can see is just a transition, this method is actually invoked the doPrepareSubmitEnvironment () method, instructions for the return tuple4 comments here,

//childArgs: Arguments for the child process
ChildClasspath: A list of classpath entries for the child
//sparkConf: map of system properties
//childMainClass: The main class for the child
Copy the code

Enters SparkSubmit. DoPrepareSubmitEnvironment (), we first look at this method exactly 537 lines of code, here we not analysis method, the effect of one we focus on emphasis: deployMode and childMainClass

SparkSubmit. DoPrepareSubmitEnvironment (), about 612 lines of position, you can see if the deployment pattern deployMode is client mode, childMainClass assignment as args. MainClass,

#The agrs.mainClass parameter is the class parameter submitted by submit
spark-submit \
--class org.apache.spark.examples.SparkPi 
Copy the code

Now let’s go to line 662. This article adopts the spark-cluster submission method, namely: StandaloneCluster. Let’s simplify the code logic

// Whether to use REST mode
if (args.useRest) {
   // This logic is ignored. }else {
	/ / childMainClass assignment
    childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
    / / childArgs assignment
    childArgs += (args.master, args.primaryResource, args.mainClass)
  }
if(args.childArgs ! =null) {
    childArgs ++= args.childArgs
  }
Copy the code

As you can see from the code logic, childMainClass is finally assigned to STANDALONE_CLUSTER_SUBMIT_CLASS, and childArgs is also assigned some values

Let’s look at the STANDALONE_CLUSTER_SUBMIT_CLASS assignment. Assign the full class name of ClientApp to the STANDALONE_CLUSTER_SUBMIT_CLASS variable

Now let’s look at ClientApp, which is a subclass of SparkApplication

Continuing back to the doRunMain() method, around line 850, we can see that the mainClass is created by reflection from childMainClass

Now let’s follow mainClass to around 877, and let’s simplify the logic

// Determine if mainClass is of type SparkApplication
val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
    // If it is of type SparkApplication, the instance is created
    mainClass.newInstance().asInstanceOf[SparkApplication]}else {
      // Non-sparkApplication type
      MainClass is the full name of the --class class
      new JavaMainApplication(mainClass)
    }

 // Call the instance's start() method and pass in the argument
 app.start(childArgs.toArray, sparkConf)
Copy the code

JavaMainApplication () : JavaMainApplication () : JavaMainApplication () : JavaMainApplication () : JavaMainApplication (); In client submission mode, the Driver is started on the client’s local machine

Summary:

  • Spark submission is a script that calls the SparkSubmit class to encapsulate and validate our arguments and determine which deployMode we are based on to determine where our –main-class submitted class code is executed
  • In client mode, mainClass is executed on the local machine
  • In cluster mode, mainClass is encapsulated as a parameter and passed to ClientAPP for the next operation

Let’s draw a diagram of the call process

SparkSubmit Submission process 02-ClientApp

Since this article uses the Spark-cluster model and mainClass is the ClientApp, let’s look at the clientapp.start () method

As we know from the previous article Spark Basics 06-Spark Client and Cluster submission process, the client needs to communicate with the Master to start the Driver, so we can assume that the client needs to have RpcEnv environment and communicate with the Master. We also need to get a reference to the Master. Let’s see if it matches our guess

The code logic is as follows: RpcEnv environment, Master reference, you can see it in the source code, and we will focus on passing the driverArgs parameter and Master reference to the ClientEndpoint endpoint class. Rpcenv.setupendpoint () is used to register endpoints. If you have read Spark source code, you must know that the registration of endpoints is followed by onStart(), which calls the endpoints. The next thing we need to focus on is the onStart() method of the ClientEndpoint class

// Parameters are converted to Driver parameters
val driverArgs = new ClientArguments(args)

Prepare RpcEnv / /
val rpcEnv =
      RpcEnv.create("driverClient".Utils.localHostName(), 0, conf, new SecurityManager(conf))

// Get the Master reference
val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
      map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
// Register the ClientEndpoint
rpcEnv.setupEndpoint("client".new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
Copy the code

Enter the clientend.onstart () method, We can see an important parameters mainClass = “org. Apache. Spark. Deploy. Worker. DriverWrapper” this parameter are similar in looks and we want to start the Driver, we simplify the code below logic

MainClass and driverArgs finally is encapsulated to DriverDescription object, then calls the asyncSendToMasterAndForwardReply RequestSubmitDriver () want to Master

/ / define mainClass
val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"

val command = new Command(mainClass,
          Seq("{{WORKER_URL}}"."{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
          sys.env, classPathEntries, libraryPathEntries, javaOpts)
// Encapsulate mainClass into DriverDescription()
val driverDescription = new DriverDescription(
          driverArgs.jarUrl,
          driverArgs.memory,
          driverArgs.cores,
          driverArgs.supervise,
          command)
// Asynchronously send a RequestSubmitDriver message to the Master
asyncSendToMasterAndForwardReply[SubmitDriverResponse] (RequestSubmitDriver(driverDescription))
Copy the code

Next we see asyncSendToMasterAndForwardReply () method, through the Master the endpoint reference, to Master RequestSubmitDriver messages sent, note: The ask() method is used here, so we look for the message processing logic in the master.receiveandreply () method

Summary:

  • Org. Apache. Spark. Deploy. Worker. DriverWrapper might be to start the Driver class
  • The purpose of the ClientAPP class is to prepare for communication with the Master, such as: ClientAPP also prepares the RpcEnv communication environment and encapsulates the required parameters in the DriverDescription object. Finally, a RequestSubmitDriver message is sent asynchronously to all Master references it holds

Now we will call the flowchart to supplement

5. SparkSubmit Submission process 03-Master

ReceiveAndReply () : ClientAPP sends the RequestSubmitDriver message to the Master and assigns the required parameters to DriverDescription. Knowing the processing logic of the RequestSubmitDriver message, you can see that a driver is returned by calling createDriver(Description)

Go to the master.createdRiver () method and see that it simply wraps the argument desc, converts it to a DriverInfo object and returns it

We return to the receiveAndReply() method. From the logic of the code, the driver parameters are wrapped as DriverInfo and stored in the Master’s driver information set. Then we call the Schedule () method and return a message to the Client.

Here we can see that there is no action to start the Driver, leaving the schedule() method to start the Driver

// Further encapsulate the driver argument as DriverInfo
val driver = createDriver(description)
// join the collection
persistenceEngine.addDriver(driver)
waitingDrivers += driver
drivers.add(driver)
/ / scheduling
schedule()
// Return information
context.reply(SubmitDriverResponse(self, true.Some(driver.id),
          s"Driver successfully submitted as ${driver.id}"))
Copy the code

The schedule() method is also called when registering a Worker, but its use has not been studied. Now let’s look at schedule()

The schedule() method is used to schedule currently available resources for waiting applications. This method is called whenever a new application is added or resource availability changes. Applications for resources, such as drivers and workers

PersistenceEngine (waitingDrivers, Drivers, persistenceEngine, etc.)

  • Driver related
    • Iterate over the set of Driver information waiting to start
    • Find workers that meet the requirements
    • Call the launchDriver() method to start the Driver
private def schedule() :Unit = {

    // Find the Worker who is still alive
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0

    /* * 1
    // The Driver in the wait set is iterated
    for (driver <- waitingDrivers.toList) {
      var launched = false
      var numWorkersVisited = 0
      // find the right Worker
      while(numWorkersVisited < numWorkersAlive && ! launched) {val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          1.3. Start the Driver
          launchDriver(worker, driver)
          waitingDrivers -= driver
          launched = true
        }
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
    /* * worker-related scheduling */
    startExecutorsOnWorkers()
  }
Copy the code

Let’s look at the launchDriver() method that starts the Driver, logs the Driver information to the Worker, and then takes the Worker’s reference and sends the launchDriver message to the Worker, passing the Driver information to the Worker

Summary:

  • All the Master needs to do is to obtain the Driver information, find the Worker that meets the requirements in the set of workers managed by the Master, and then send the LaunchDriver method to the Worker to start the Driver

6. SparkSubmit Submission process 04-Worker

Back to worker.receive (), we find the corresponding LaunchDriver message processing process. We can see that the Driver information is encapsulated into the DriverRunner class, and then call start() method to start the Driver and update the Worker usage information

DriverRunner. Start (). We don’t have to pay too much attention to the details of Driver startup because Java startup processes are implemented differently on Windows and Linux. We need to know to start the Driver in front we know org. Apache. Spark. Deploy. Worker. DriverWrapper class

Back to the DriverWrapper class, we can see that there is another mainClass. This mainClass is the full class name that we submitted via –class. From the code logic, we can see that mainClass is created by reflection in DriverWrapper. And finally call the invoke() method and execute its main method, so that DriverWrapper runs our submitted code, which is an instance of Driver

At this point, our Driver startup process has been tracked

Summary:

  • Worker is by receives the Driver information, on its own machine to start the Driver, who is mainClass of startup parameter passing org. Apache. Spark. Deploy. Worker. DriverWrapper class
  • From the source to confirm org. Apache. Spark. Deploy. Worker. DriverWrapper is we know Driver instance
  • At this point, the Driver startup process is almost complete

7,

  • The roles involved in the Spark submission process are as follows:
    • (SparkSubmit –>) Client –> Master –> Worker –> Executor –> Driver
  • (For Spark-cluster) The Client instance is ClientApp, and the Driver instance is DriverWrapper
  • We have already traced the source code of Spark’s resource layer and the beginning of the computing layer. The following will be the main part of the Spark source code parsing, the source code parsing of SparkContext