Learn more about Java basics


Learning materials

Spark2.0.2 Source code analysis — RPC communication mechanism (message processing) Spark network communication – Implementation of RPC Spark message communication architecture in-depth analysis of SPARK RPC ** Spark RPC Communication mechanism Spark2.1.0 – The built-in RPC framework ** Spark RPC implementation principle Analysis Spark RPC Dispatcher, Inbox, and Outbox

Limited ability, is still a learner posture, so just record the RPC module source code learning process. In the process of learning to find the above several is a good learning materials, recommended to everyone, with * indicates that it is worth priority to view learning materials.

Because each material has its own focus, it is possible to look at classes and architectures that are not covered in detail. Below I have summarized the above information, which can be compared in the process of learning.

Important concepts

  • RpcEnv: The RpcEnv abstract class represents an RPC Environment and manages the entire RpcEndpoint life cycle. The environment on which each Rpc endpoint runs is called RpcEnv.

  • NettyRpcEnv: unique implementation class of RpcEnv

  • RpcEndpoint: RPC endpoints. Spark calls each communication entity an RPC endpoint and implements RpcEndpoint interfaces, such as DriverEndpoint and MasterEndpont. It designs different messages and service processes according to the requirements of different endpoints.

  • Dispatcher: message Dispatcher (a concept from Netty) that distributes RPCMessages to the corresponding RpcEndpoint. The Dispatcher contains a MessageLoop that reads the delivery RpcMessage in the LinkedBlockingQueue, finds the Endpoint’s Inbox according to the Endpoint identifier specified by the client, and then posts it in. Because it is a blocking queue, it naturally blocks when there is no message, and as soon as there is a message, it starts working. The Dispatcher’s ThreadPool is responsible for consuming these messages.

  • EndpointData: Each endpoint has a corresponding EndpointData, which contains RpcEndpoint and NettyRpcEndpointRef information and an Inbox. The Inbox has an InboxMessage linked list. The message sent to the endpoint is added to the linked list, and the entire EndpointData is added to the Dispatcher blocking queue, which is processed asynchronously by the Dispatcher thread

  • The Inbox. A local endpoint corresponds to an Inbox, and an Inbox has a linked list of InboxMessages. Inboxmessages have many subclasses, which can be rpcMessages that are called from a remote location. It can be the fire-and-forget one-way Message OneWayMessage that is called remotely, or it can be the Message that various services are started, links are established and disconnected, etc., all of which will make pattern matching in the methods inside the Inbox. Call the corresponding function of RpcEndpoint.

  • RpcEndPointRef: RpcEndPointRef is a remote reference object to RpcEndpoint through which messages can be sent to the remote RpcEndpoint end for communication.

  • NettyRpcEndpointRef: the only implementation class of RpcEndpointRef, the NettyRpcEnv version of RpcEndpointRef. The behavior of this class depends on where it was created. On nodes that “own” RpcEndpoint, it is a simple wrapper for the RpcEndpointAddress instance.

  • RpcEndpointAddress: contains information about RpcAddress (host and port) and RPC endpoint name

  • Outbox: One remote endpoint corresponds to one Outbox. NettyRpcEnv contains a ConcurrentHashMap[RpcAddress, Outbox]. Once the message is put into the Outbox, it is then sent out via TransportClient.

  • TransportContext: TransportServer (TransportClientFactory) is a context for creating TransportServer, TransportClientFactory, and TransportChannelHandler for netty Channel pipelines. The TransportClient provides two communication protocols: RPC at the control level and Chunk fetching at the data level. The rpcHandler passed in by the user through the constructor is responsible for handling RPC requests. And rpcHandler is responsible for setting up streams that can be streamed as blocks of data using zero-copy IO. Both the TransportServer and TransportClientFactory create a TransportChannelHandler object for each channel. Each TransportChannelHandler contains a TransportClient, which enables the server process to send messages back to the client over an existing channel.

  • TransportServer: The TransportServer is the server of RPC framework and provides efficient and low-level flow services.

  • TransportServerBootstrap: Defines a specification for the server bootstrap, which is intended to be executed on the client pipe held by the server after the connection is established between the client and the server. Used to initialize the TransportServer

  • TransportClientFactory: TransportClient factory class that creates the TransportClient (TransportClient).

  • TransportClient: Client of the RPC framework, used to obtain sequential blocks from pre-negotiated streams. TransportClient is designed to allow the efficient transfer of large amounts of data, which will be broken up into chunks ranging from a few hundred KILobytes to a few megabytes. In short, you can think of the TransportClient as the underlying client class for Spark Rpc. Chunk used to send RPC requests to the server and obtain streams from the server.

  • TransportClientBootstrap: a client bootstrap program executed on the TransportClient to perform initial preparations (such as authentication and encryption) for connection establishment. The operations performed by TransportClientBootstrap are often expensive, but the connections established can be reused. Used to initialize the TransportClient

  • TransportChannelHandler: The transport-layer handler that delegates requests to the TransportRequestHandler and responses to the TransportResponseHandler. All channels created in the transport layer are bidirectional. When a client starts a Netty channel with a RequestMessage (handled by the server’s RequestHandler), the server generates the ResponseMessage (handled by the client’s ResponseHandler). However, the server also gets a handle on the same Channel, so it might start sending RequestMessages to the client. This means that the client also needs a RequestHandler and the Server needs a ResponseHandler for the client’s response to the Server’s request. Such also handles from io.net ty. Handler. A timeout. IdleStateHandler timeout. If there is an outstanding extract or RPC request but no traffic on the channel at least on the “requestTimeoutMs”, we consider the connection timed out. Note that this is duplex flow; If the client keeps sending but does not respond, we will not time out.

    If the TransportChannelHandler reads a request of the RequestMessage type, the TransportChannelHandler forwards the processing of the request to the TransportRequestHandler. If the TransportChannelHandler reads a request of the RequestMessage type, the TransportChannelHandler forwards the processing of the request to the TransportRequestHandler. The processing of this message is further handed over to the TransportResponseHandler.

  • TransportResponseHandler: A handler that handles the response from the server and responds to the client that made the request.

  • TransportRequestHandler: A handler used to handle requests from clients and return them after writing block data.

  • MessageEncoder: Before putting the message into the pipe, encode the message content to prevent packet loss and parsing errors when the other end of the pipe reads the message.

  • MessageDecoder: Parse ByteBuf read from the pipeline to prevent packet loss and parsing errors;

  • TransportFrameDecoder: Parse ByteBuf read from a pipe by data frame;

  • StreamManager: Handles ChunkFetchRequest and StreamRequest requests

  • RpcHandler: Handles RpcRequest and OneWayMessage requests

  • Message: Message is an abstract interface for a Message. Each class implements a RequestMessage or ResponseMessage interface directly or indirectly. There are four specific implementations of RequestMessage:

    • ChunkFetchRequest: A sequence of requests to fetch a single chunk of the stream. The ChunkFetch message is used to abstract all the messages that need to be transferred when the data pull operation is involved in Spark
    • RpcRequest: This message type is handled by the remote RPC server and is a type of RPC request information that requires a reply from the server to the client.
    • OneWayMessage: This message also needs to be processed by the remote RPC server. Unlike RpcRequest, no reply is required from the server to the client.
    • StreamRequest: This message represents a request to a remote service for streaming data. The Stream message is used to transfer jars and files from the driver to the executor.

    As OneWayMessage does not require a response, so ResponseMessage has three implementations of the success or failure state, respectively:

    • ChunkFetchSuccess: message returned after successfully processing ChunkFetchRequest.
    • ChunkFetchFailure: message returned after processing a ChunkFetchRequest failure;
    • RpcResponse: Processing the message returned after RpcRequest succeeds;
    • RpcFailure: processing messages returned after RpcRequest fails;
    • StreamResponse: a message returned after processing a StreamRequest success;
    • StreamFailure: a message returned after processing a StreamRequest failure;

Important legend

Spark RPC architecture diagram

RpcEndpointRef can be a simple wrapper of the local RpcEndpoint or a representative of the remote RpcEndpoint. When RpcEndpoint is sent to RpcEndpointRef, if the RpcEndpointRef is the local RpcEndpointRef, the event message will be further distributed by the Dispatcher. If it is a remote message, the event is further encapsulated as an OutboxMessage, which is then channeled through the local TransportClient to the remote RpcEndpoint.

NettyRpcEnv structure

The basic architecture of Spark’s built-in RPC framework

TransportContext contains configuration information about the TransportContext TransportConf and RpcHandler that processes client request messages. TransportConf is required to create both TransportClientFactory and TransportServer, whereas RpcHandler is only used to create TransportServer. TransportClientFactory is a factory class for RPC clients. TransportServer is the implementation of RPC server. The meanings of the symbols in the figure are as follows:

Token 1: creates an instance of the TransportClientFactory TransportClientFactory by calling the TransportContext createClientFactory method. When constructing an instance of The TransportClientFactory, a list of the client bootstrap TransportClientBootstrap is also passed. In addition, the TransportClientFactory also has a connection pool ClientPool for each Socket address. The ClientPool is an array of TransportClients, and the locks Object and the Clients Object correspond to each of the TransportClients by array index. Reduces contention for locks between threads in the case of concurrency, thereby reducing blocking and increasing concurrency.

Token 2: The TransportServer instance is created by calling the TransportContext createServer method. When constructing an instance of TransportServer, you need to pass a list of TransportContext, host, port, RpcHandler, and the server bootstrap TransportServerBootstrap

The pipeline processes the request graph

RPC framework server process request and response flow chart

Client request and response flow chart

The Spark Message legend

Message is an abstract interface for a Message. All Message implementation classes directly or indirectly implement the RequestMessage or ResponseMessage interface.

  • ChunkFetchRequest: A sequence of requests to fetch a single chunk of the stream. The ChunkFetch message is used to abstract all the messages that need to be transferred when the data pull operation is involved in Spark
  • RpcRequest: This message type is handled by the remote RPC server and is a type of RPC request information that requires a reply from the server to the client.
  • OneWayMessage: This message also needs to be processed by the remote RPC server. Unlike RpcRequest, no reply is required from the server to the client.
  • StreamRequest: This message represents a request to a remote service for streaming data. The Stream message is used to transfer jars and files from the driver to the executor.

As OneWayMessage does not require a response, so ResponseMessage has three implementations of the success or failure state, respectively:

  • ChunkFetchSuccess: message returned after successfully processing ChunkFetchRequest.
  • ChunkFetchFailure: message returned after processing a ChunkFetchRequest failure;
  • RpcResponse: Processing the message returned after RpcRequest succeeds;
  • RpcFailure: processing messages returned after RpcRequest fails;
  • StreamResponse: a message returned after processing a StreamRequest success;
  • StreamFailure: a message returned after processing a StreamRequest failure;

Server startup sequence diagram

Server response time sequence diagram

The first stage is IO reception. TransportRequestHandler is netty’s callback handler. It parsed a complete packet from the Wire format and sent it to NettyRpcEnv for deserialization. If it was an RPC call, it constructed an RpcMessage. Then call back the method of RpcHandler to process the RpcMessage. The Dispatcher will be called internally to deliver the RpcMessage and put it into the Inbox.

Phase two, IO response. MessageLoop gets the RpcMessage with processing and sends it to ThreadPool in Dispatcher for processing. In fact, it calls the business logic of RpcEndpoint and serializes the message through RpcCallContext. Through the callback function, Tell TransportRequestHandler that this is a message processed and respond back.

Please focus on the convenience brought by asynchronous processing. The mode combined by Reactor and Actor Mailbox is used to decouple the message acquisition and processing logic.

Client response time sequence diagram

Generally, the client needs to establish RpcEnv and then obtain RpcEndpointRef.

Phase one, I/O sending. RpcEndpointRef is used to perform send or ask. Take SEND as an example. Send serializes the message first and then sends it to the Outbox at the specified address. The underlying TransportClient is then called to send the data directly through the NETty API, which returns the UUID as the message identifier for the next stage of the callback. In this case, a Future can be returned, and the client can block or continue with other operations.

Second, IO reception. When the TransportResponseHandler receives the remote response, it does the reverse sequence number and then calls back to the Future of phase 1 to complete the call. This process is all done in the Reactor thread, using the Future to communicate between threads.

RPC communication architecture diagram

Dispatcher and Inbox distribute and process requests

OutBox handles requests