1, the preface

Master is a core role in Spark. It is involved in cluster communication and resource invocation requests. It not only receives registration calls from drivers and workers, but also needs to know the status of other roles, such as Executor and Driver, based on scheduling.

From the above inference, the Master needs to have a transport layer (TransportServer) dedicated to sending and receiving services, as shown in the following figure

As can be seen from the figure above, if there are too many instances of transmission, there will be problems, such as: who is the instance message A sent to, which instance message B received by, etc. For this, we can add A Dispatcher on the transmission layer, as shown in the figure below

The introduction of the dispatcher solves the problem of sending and receiving multiple instances, but also leads to a new problem. If there are too many instances, there will be too many transmitted messages and messages will be stored in the transmission layer. Therefore, we can add a message queue between the transmission layer and the dispatcher to buffer data transmission, as shown in the following figure

The introduction of message queue solves the problem of message decompression. It can also be seen from the above architecture diagram that messages are instances pushing data actively. In order to further solve the instance pressure, Inbox is introduced as the carrier of messages, and the distributor takes the initiative to pull messages from the mailbox, further liberating the role of instances, as shown in the following figure

So far, we have drawn our own Master architecture diagram, and now explore the Spark source code.

2, Master source code analysis

2.1. Spark Cluster Startup process

From the spark cluster startup command, SPARKHOME/sbin/start−all.sh actually calls two other scripts to start the Master and Worker. {SPARK_HOME}/sbin/start-all.sh SPARKHOME/sbin/start−all.sh actually calls two other scripts to start the Master and Worker. {SPARK_HOME}/sbin/start-master.sh and ${SPARK_HOME}/sbin/start-slaves

${SPARK_HOME} / sbin/start – master. Sh, the master is through script creates org. Apache. Spark. Deploy. Master. The master to start the master

SPARKHOME/sbin/start− Slaves. Sh The underlying function is to call {SPARK_HOME}/sbin/start-slaves The bottom line is to call SPARKHOME/sbin/start−slaves. Sh the bottom line is to call {SPARK_HOME}/sbin/start-slave.sh(with the s missing). The Worker is through script creates org. Apache. Spark. Deploy. Worker. The Worker to create the Worker

2.2. Master creation process

2.2.1 Traceability process 01-Master overall process

In this article, spark source version is 2.3.4. Open the Master class diagram, you can see that Master has associated objects, and start with main() directly

The main() method calls startRpcEnvAndEndpoint() of the same class and passes arguments to the method. The startRpcEnvAndEndpoint() method does two things:

  • First, prepare the Rpc environment
  • The second is to register the Master with the corresponding Rpc environment, which is actually the registration to start the Master

Click on the rpcenv.create () method. RpcEnv is an abstract class, and the details are implemented by the NettyRpcEnv class

NettyRpcEnv inherits from RpcEnv, and RpcEnv has only NettyRpcEnv implementation. Therefore, spark’s underlying Rpc transport layer has only Netty implementation

Looking at the properties of NettyRpcEnv, you can see several key properties:

  • A Dispatcher

    private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
    Copy the code
  • TransportContext: TransportContext and contains the dispenser

    private val transportContext = new TransportContext(transportConf,
      new NettyRpcHandler(dispatcher, this, streamManager))
    Copy the code

Are there any similarities between these two attributes and the architecture diagram above

Summary:
  • The Spark cluster is started by creating Master and Worker processes using scripts in the ${SPARK_HOME}/sbin/ directory

  • There are two steps to start the Master process:

    • One is to create RpcEnv environment, and the implementation of Rpc transport layer only Netty for the time being
    • The second is to register the Master with the Rpc environment, where the Master process is actually started
  • There are two properties in NettyRpcEnv, Dispatcher and TransportContext, that are similar to the Master schema diagram we presumably drew

Now let’s record the known classes and call relationships

2.2.2 Trace process 02-RpcEnv source code a TransportServer transport layer service

By clicking on the new NettyRpcEnvFactory().create(config) method, you can see from the source code comment that Spark uses Java serializer by default because Java serializer is safe in multiple threads.

Here’s an interesting piece of code to explain

// It takes advantage of Scala's features to define specific functions but not execute them
val startNettyRpcEnv: Int= > (NettyRpcEnv.Int) = { actualPort =>
  nettyEnv.startServer(config.bindAddress, actualPort)
  (nettyEnv, nettyEnv.address.port)
}
........
// Execute the above function
  Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
Copy the code

Utils. StartServiceOnPort () starts the service.

Click on the nettyenv.startServer () method to see the two important NettyRPCEnv properties we need to focus on, Dispatcher and TransportContext

Points into transportContext. CreateServer () method, this method is to create a TransportServer (), from the comments, the “create a service and bind a special IP and port”, is the real start the transport layer service here

TransportServer (TransportServer) init() ¶ TransportServer (TransportServer) init() ¶ TransportServer (TransportServer) ¶ I will find a later chapter to fill in the Netty explanation.

Here we need to pay attention to another important attribute, RpcHandler, as shown in the above NettyRpcEnv class screenshot, RpcHandler contains Dispatcher

private val transportContext = new TransportContext(transportConf,
  new NettyRpcHandler(dispatcher, this, streamManager))
Copy the code

Looking at the RpcHandler implementation class here, you can see that we are focusing on the same package, NettyRpcHandler

Summary:
  • The default serializer used in Spark is the Java serializer because it is safe in multi-threaded situations
  • TransportServer is the real transport layer service, and RpcHandler under TransportServer contains the Dispatcher partition

Now let’s record the known classes and call relationships

2.2.3 Traceability process 03-RpcEnv source code two TransportServer call tracing

Transportserver.init () : transportServer.init () : transportServer.init () : transportServer.init () : transportServer.init () : transportServer.init (transportServer.init) Here we pay attention to the appRpcHandler is converted to rpcHandler and passed initializePipeline() method, below we specifically trace the code flow, find out the purpose of the Dispatcher, the code details are not described

. It opens at TransportContext createChannelHandler () method, to abandon Netty code, the so-called personal understanding of the way of message transmission is that the need to know the client (client), the request (request), and the response (return), The client sends a request to the server, and the server returns a message to the client after receiving the request

The TransportRequestHandler is a TransportRequestHandler. The Handler is the details of handling messages in Netty. Here we focus on the handle() method

Opens at the Dispatcher. ProcessOneWayMessage () method, see our familiar RPCHandler here, check the specific implementation, we returned to our familiar NettyRpcHandler

Going back to the nettyrpChand.receive () method, we can see this clearly

dispatcher.postOneWayMessage(messageToDispatch)
Copy the code

Back to the Dispatcher class

Point to the Dispatcher. PostOneWayMessage () method, so far we track the Dispatcher also got the answer: The transport layer will eventually call dispatcher.postMessage () when sending a message and then return to Dispatcher

Summary:
  • The TransportServer transport service has a Netty implementation at the bottom. If the TransportServer source code is traced to transportServer.init (), the transport layer service has been started
  • The TransportServer transport service actually transports messages at the bottom level by calling the dispatcher.postMessage (), which in turn processes messages by the Dispatcher

Now let’s record the known classes and call relationships

2.2.4 Trace process 04-RpcEnv source code iii of the Dispatcher

We already know that the TransportServer property contains Dispatcher. Now let’s look at Dispatcher. Click on Dispatcher to see two other important data. Inbox (Mailbox) and Receivers are used to store inbox messages, which we can understand as message queues, exactly as we guessed at the beginning.

private class EndpointData(
    val name: String,
    val endpoint: RpcEndpoint,
    val ref: NettyRpcEndpointRef) {
  val inbox = new Inbox(ref, endpoint)// Mailbox: the actual location where messages are stored
}

 // Message queue
 // Track the receivers whose inboxes may contain messages.
  private val receivers = new LinkedBlockingQueue[EndpointData]

  // Store all endpoint information
  private val endpoints: ConcurrentMap[String.EndpointData] =
    new ConcurrentHashMap[String.EndpointData]
  // Store references to all endpoints
  private val endpointRefs: ConcurrentMap[RpcEndpoint.RpcEndpointRef] =
    new ConcurrentHashMap[RpcEndpoint.RpcEndpointRef]
Copy the code

Another important attribute of the Dispatcher is that the user processes the message queue data. By creating MessageLoop threads, you can see that MessageLoop threads are the core threads for message processing

/** Thread pool used for dispatching messages. */
private val threadpool: ThreadPoolExecutor = {
  val availableCores =
    if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
  val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
    math.max(2, availableCores))
  val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
  for (i <- 0 until numThreads) {
    pool.execute(new MessageLoop)
  }
  pool
}
Copy the code

When the Dispatcher is created, numThreads MessageLoop threads are created to process data in the MessageLoop queue

// Polling gets data without stopping
while (true) {
  try {
    // Fetch the endpoint data from the queue
    val data = receivers.take()
    // If the data is incorrect, skip it
    if (data == PoisonPill) {
      // Put PoisonPill back so that other MessageLoops can see it.
      receivers.offer(PoisonPill)
      return
    }
    // Non-error data, then pull out the endpoint's Inbox and call its process method
    data.inbox.process(Dispatcher.this)}catch {
    case NonFatal(e) => logError(e.getMessage, e)
  }
}
Copy the code

There is a process to fetch data from the message queue, that is, to put data into the message queue, then we go back to the dispatcher.postMessage () method, the logic is as follows

// Fetch endpoint data by name
val data = endpoints.get(endpointName)
if (stopped) {
  Some(new RpcEnvStoppedException()}else if (data == null) {
  Some(new SparkException(s"Could not find $endpointName."))}else {
  // Put the message into the mailbox of the endpoint
  data.inbox.post(message)
  // Put the endpoint into a message queue so that the subsequent MessageLoop thread can retrieve the endpoint for processing
  receivers.offer(data)
  None
}
Copy the code

Taking a look at the Inbox class, you can see that all messages are stored in the Messages property and that inbox.synchronized {} initializes the new Inbox() with a code run

protected val messages = new java.util.LinkedList[InboxMessage] ()// Actual message

  // This is an interesting piece of code that relates to the Master, which we will explain later
  // OnStart should be the first message to process
  inbox.synchronized {
    // The first message that inbox puts into its creation is the sample class OnStart
    messages.add(OnStart)}Copy the code

The inbox.process () method is used to process messages

var message: InboxMessage = null
    inbox.synchronized {
      // Retrieve data from the message set
      message = messages.poll()
}
while(true) {// Match messages against message
    message match{
        case RpcMessage(_sender, content, context) => endpoint.receiveAndReply()
        case OneWayMessage(_sender, content) => endpoint.receive()
        case OnStart => endpoint.onStart() // Call the endpoint's onStart() method
        case OnStop => endpoint.OnStop(a)// Call the endpoint's OnStop() method. => endpoint.xxx() } }Copy the code

It can be seen from the code logic that, according to the message type, the last is the implementation method of calling the endpoint, wherein onStart() and onStop() are the core methods of the endpoint, which can be literally understood as starting and closing the service

So far, the Dispatcher and Inbox for the message processing process, we have a clear understanding

Summary:
The inclusion relation is as follows:TransportServerThe transport layer {DispatcherThe dispenser {InboxReceiver {messages message queue} Receivers EndPoints endpointRefs Endpoint reference message queue}}Copy the code
  • The Dispatcher also holds a collection of endpoint information and a collection of endpoint reference information (endpoints and endpoint references are explained below).

  • The Dispatcher processes messages through the thread pool by starting multiple MessageLoop threads to actively pull data from the message queue, where the messages are processed by calling the **Inbox.process()** method

  • The processing logic of the Inbox.process() method ultimately calls the endpoint’s various methods for processing

Now let’s record the known classes and call relationships

2.2.5, traceability process 05-RpcEnv source code four RpcEndpoint endpoint and RpcEndpointRef endpoint reference

From the above Dispatcher source screenshots, we know that in addition to the attribute of Inbox, there are more important endpoint classes RpcEndpoint and NettyRpcEndpointRef

So RpcEndpoint is an interface class, it has only one property rpcEnv, and you can see from the comments that it fires a particular function for a particular message, that’s the definition of this interface class

Next, we look at the methods defined below this class, where we can see that onStart(), onStop(), Receive (), and receiveAndReply() correspond to the endpoint methods called in Inbox.process()

Look again at the annotations for the Receive () and receiveAndReply() methods, which receive messages sent from the endpoint reference class and process them based on type

Next we jump to the endpoint reference class RpcEndpointRef, which is also an interface class, where the main property is this address: RpcAddress and the methods that define communication with the endpoint class

The RpcAddress class wraps the address of the endpoint, such as IP, port number, and so on

Summary:
  • So far, we can know that RpcEndpoint and RpcEndpointRef exist in pairs
    • The RpcEndpointRef reference class stores the ADDRESS information of RpcEndpoint and defines a method to communicate with RpcEndpoint. After obtaining the address information, RpcEndpoint can communicate with RpcEndpoint and send messages
    • RpcEndpoint receives messages and defines how messages are processed. According to the received messages, specific methods are executed according to the message type
  • RpcEndpoint and RpcEndpointRef define the types and methods of communication respectively

Now let’s record the known classes and call relationships

2.2.6 Traceability process 06-Master started

Through the previous understanding, we have the RpcEnv environment preparation and call between the roles, let’s summarize the previous source code through a diagram

  • Transport layer service data is finally put into a message queue via dispatcher.postMessage ()
  • In the Dispatcher, the data of the message queue is processed by polling in a multi-threaded way
  • The message is ultimately processed by calling the endpoint’s methods
  • RpcEnv is the basis for communication. All roles that need to communicate must communicate on RpcEnv

Now that we’ve tracked the process of preparing the RpcEnv environment, let’s move on to the Master code

 val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME.new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
Copy the code

Enters rpcEnv. SetupEndpoint () method, into the concrete implementation class NettyRpcEnv, can clearly see the low-level calls the Dispatcher. RegisterRpcEndpoint () method, and the second parameter, It must be RpcEndpoint. It can be sure that the Master we want to start is also one of the implementation classes of RpcEndpoint. Similarly, we can guess that all the communication involved must also be one of the implementation classes of RpcEndpoint, such as Worker and Driver

Enters the Dispatcher. RegisterRpcEndpoint () method, this method is mainly to the endpoint registration RpcEnv environment and start the endpoint instances, specific code logic is as follows

  def registerRpcEndpoint(name: String, endpoint: RpcEndpoint) :NettyRpcEndpointRef = {
    val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
      if (stopped) {
        throw new IllegalStateException("RpcEnv has been stopped")}// Determine if the endpoint is registered. If not, wrap the endpoint as an endpoint information class and store it
      if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) ! =null) {
        throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")}// Get the endpoint by name
      val data = endpoints.get(name)
      // Put the endpoint reference into the collection
      endpointRefs.put(data.endpoint, data.ref)

      /** * put the endpoint on the message queue. This is important, and the Master is also started by this method */
      receivers.offer(data)  // for the OnStart message
    }
    endpointRef
  }
Copy the code

In a new Inbox(), the first message put into the messages is the sample class OnStart, which starts the endpoint service;

It is clear that whenever an endpoint class is registered with an RpcEnv environment, the OnStart method must be invoked asynchronously by threads. This is why so much work is spent preparing the RpcEnv environment

Back to the Master class and check the class inheritance diagram, it can be seen that, consistent with our previous guess, roles such as Master and Worker are the implementation classes of RpcEndpoint

So let’s say, in pseudocode, that Master starts

// The TransportServer and Dispatcher are ready for Rpc communication
val rpcEnv = RpcEnv.create()

// Create the Master endpoint class
val master = new Master(a)// Register the Master endpoint with the Rpc environment and start asynchronously
// This is the first step to start the Master service
rpcEnv.setupEndpoint(NAME,master)
Copy the code

Let’s talk about why we want to start the service asynchronously, using pseudocode to explain:

// Method 1:
// The code executes linearly, from the top down
// Let's say that thread2 takes a lot of time to execute. It used to take 2s for thread3 to start, but now it has to wait for thread2 to complete
val thread1 = new Thread()
thread1.start()
val thread2 = new Thread()
thread2.start()
val thread3 = new Thread()
thread3.start()

// Method 2:
// Execute asynchronously with the help of message queues and multithreading
// Thread execution is performed by each thread in the thread pool, without waiting for the previous thread to execute
val list = new ArrayList<Thread> ()val thread1 = new Thread(a)val thread2 = new Thread(a)val thread3 = new Thread()
list.add(thread1)
list.add(thread2)
list.add(thread3)
executors.exec(list)
Copy the code

At this point, we have tracked the entire startup process of the Master, and we will update the Master into our source summary diagram

Summary:
  • RpcEnv is the basis for communication. All roles that need to communicate must communicate on RpcEnv
  • Whenever an endpoint class is registered with an RpcEnv environment, the OnStart method must be invoked asynchronously by threads
  • As long as roles such as Master and Worker are involved in communication, the RpcEnv environment must exist

3, summarize

  • RpcEnv is the basis of Spark communication. The RpcEnv environment must be available for the role to communicate with. Currently, Spark uses only Netty Rpc
  • The first step is to prepare the RpcEnv environment. The second step is to create the Master and inject it into the RpcEnv environment
  • The dispatcher.postMessage () method is called to place the message in the Receivers message queue, which is polling by MessageLoop
  • Spark uses RpcEndpoint and RpcEndpointRef as Rpc communication rules. For example: The Worker wants to communicate with the Master, so the Worker only needs to hold the MasterEndpointRef reference class in the RpcEnv environment to communicate with the Master