An overview of

Aloha is a scala-based distributed task scheduling and management framework that provides plug-in extensions that can be used to schedule various types of tasks. Aloha is typically used as a unified entry point for task management. For example, various applications, such as Spark, Flink, and ETL, run on a data platform. It is necessary to manage these tasks in a unified manner and detect task status changes in a timely manner.

The basic implementation of Aloha is a task scheduling module based on Spark, which is modified on the basis of Master and Worker components and provides an extended interface to easily integrate various types of tasks. Master supports high availability configuration and state recovery, and provides a REST interface for submitting tasks.

extension

Different types of applications

In Aloha, scheduled applications are abstracted as Application interfaces. You only need to implement the Application interface on demand, so you can schedule and manage different types of applications. The life cycle of Application is mainly managed by start() and shutdown(). When the Application is scheduled to be executed on the worker, start() method is called first. When the user requests to forcibly stop the Application, shutdown() method is called.

trait Application {
  / / start
  def start() :Promise[ExitState]
  // Force a stop
  def shutdown(reason: Option[String) :Unit
  // The description of the application submitted
  def withDescription(desc: ApplicationDescription) :Application
  // The working directory where the application is running
  def withApplicationDir(appDir: File) :Application
  // System configuration
  def withAlohaConf(conf: AlohaConf) :Application
  // Clean up after the application is finished
  def clean() :Unit
}
Copy the code

You may have noticed that the start() method returns a Promise object. This is because Aloha was originally designed for long-running applications such as Flink tasks, Spark Streaming tasks, and so on. For such long-running applications, futures and Promises provide a more flexible mechanism for notifying task status. When the task stops, the Worker is notified by calling the promise.success () method.

For example, if you want to start an application by starting a separate process, you can do this:

  override def start() :Promise[ExitState] = {
    // Start the process
    val processBuilder = getProcessBuilder()
    process = processBuilder.start()
    stateMonitorThread = new Thread("app-state-monitor-thread") {
      override def run() :Unit = {
        val exitCode = process.waitFor()
        // The process exits
        if(exitCode == 0) {
          result.success(ExitState(ExitCode.SUCCESS.Some("success")))}else {
          result.success(ExitState(ExitCode.FAILED.Some("failed")))
        }
      }
    }
    stateMonitorThread.start()
    result
  }

  override def shutdown(reason: Option[String) :Unit = {
    if(process ! =null) {
      // Force the process to end
      val exitCode = Utils.terminateProcess(process, APP_TERMINATE_TIMEOUT_MS)
      if (exitCode.isEmpty) {
        logWarning("Failed to terminate process: " + process +
          ". This process will likely be orphaned.")}}}Copy the code

Custom event listening

In many cases, we want to be able to sense changes in task status in real time, such as sending a message when a task has completed or failed. Aloha provides an event listening interface that responds to changes in task status.

trait AlohaEventListener {
  def onApplicationStateChange(event: AppStateChangedEvent) :Unit

  def onApplicationRelaunched(event: AppRelaunchedEvent) :Unit

  def onOtherEvent(event: AlohaEvent) :Unit
}
Copy the code

Custom implementation event listeners are registered dynamically at Aloha startup, or multiple listeners can be registered at the same time.

Module design

The overall architecture

The overall implementation scheme of Aloha is based on Spark, so Aloha is also based on the master-slave architecture, which is mainly composed of two main components: Master and Worker: Master is responsible for managing all workers in the cluster, receiving applications submitted by users and assigning applications to different workers. Workers are mainly responsible for starting and closing specific applications and managing the life cycle of applications. Aloha also provides REST services, effectively acting as a Client for submitting applications through the REST interface.

Task Scheduling Management

The Worker registration

After the Master starts, wait for the registration request of the Worker. When the Worker starts, the registration request is sent to the Master according to the Master’s address. Since there may be multiple instances of the Master running, all the masters in the Worker will send the registration request, and only the Master in the Alive state will respond with the registration success message. The Master in Standby state will inform the Worker that it is in Standby state, and the Worker will ignore this kind of message. The Worker will keep trying to send the registration request to the Master until it receives a successful registration response. When sending a registration request to the Master, the requested message will contain the computing resource information of the current Worker node, including the available CPU quantity and memory size. The Master will track the resource usage of the Worker when scheduling.

Once the Worker is registered successfully, it periodically sends heartbeat information to the Master. The Master will check the heartbeat status of all workers regularly. Once it finds that it has not received the heartbeat message of a Worker for a long time, the Worker is considered offline. In addition, the network connection established between the Master and Worker will be disconnected due to network failure or abnormal process exit, and the disconnection event can be directly monitored by the Master and Worker. For Master, once a Worker is offline, it is necessary to set the applications running on the Worker to abnormal state or reschedule these applications. For Worker, once it loses the connection established with the Master, it needs to re-enter the registration process.

Application submitted

You can submit an Application to the Master in two ways, either through the REST interface or by creating a Client that sends RPC calls to the Master using the Master’s address. In effect, REST Server acts as a Client.

When the Master receives a request to register an Application, it assigns the applicationId and places the Application on a waiting list. During the scheduling, FIFO is adopted to select workers whose remaining resources can meet the application requirements, and send application startup messages to corresponding workers, and the application switches from sumisolation state to LAUNCHING state. After receiving the request of the started application, the Worker will create a working directory for the corresponding application and start a separate working thread for each application. After the application is successfully started, a message is sent to the Master indicating that the application status changes to RUNNING. After that, whenever the application state changes, such as the successful completion of a task or an unexpected exit, a message is sent to the Master indicating that the application state has changed. After the application is started, the worker thread for the application blocks and waits for the application to finish. When the Master receives the request to forcibly stop the Application, it will forward the message to the corresponding Worker. After receiving the message, the Worker will interrupt the Worker thread of the corresponding Application. In response to the interruption, the Worker thread will call the forced closing method provided by Application to forcibly stop the Application.

In order to support the extension of different applications, the Worker uses a customized ClassLoader to load the dependency packages and configuration file paths provided by the application when starting the application. At present, it is necessary to place corresponding files on each Worker in advance and specify the path when submitting the application. You can use a distributed file system, such as HDFS, to download dependent files before starting applications or upload dependent files when users submit applications to avoid the inconvenience of pre-placing files. Because the dependency files of each Application are loaded separately, users can easily upgrade applications and avoid dependency conflicts among different applications.

Fault-tolerant mechanism

The Master manages the scheduling of applications in the cluster. If the Master is abnormal, the cluster breaks down. Therefore, an exception recovery mechanism must be provided for the Master.

The core flow of Master’s exception recovery mechanism is state recovery. The Master will store the state information of the registered Worker and Application in the persistence engine (currently FileSystem and ZooKeeper are supported, and extensions are supported). Whenever the state of the Worker or Application changes, All updates the state saved in the storage engine. When Master starts, it is in the Standby state. Once the Master is elected as the Alive node, the state information of the Worker and Application should be read from the storage engine first. If there is no historical state, the Master can change to the Alive state. Otherwise, the recovery process will start. Our state changes to be RECOVERING. In the recovery process, the state of the Application should be checked first. If the Application has not been scheduled to any Worker, it will be put into the scheduling queue. Otherwise, set the Application state to ApplicationState.unknown. Then check the state of all workers, set the Worker to workerstate.unknown state, and try to send the message MasterChange to the Worker. After the Worker receives the message of MasterChange, it will respond to the Master the status of all applications running on the Worker. After receiving the response, the Master can adjust the corresponding Worker and Application to workerstate. ALIVE and ApplicationState.running respectively. For workers and applications with no response due to timeout, they are considered to be offline or exit abnormally. At this point, the state recovery is complete, the Master enters the ALIVE state, and can normally process various requests of the Worker and Application.

In Standalone mode, FILESYSTEM can be used as the storage engine. In this case, only one Master will run and the Standalone mode will be restarted manually. You can also set the Master to HA mode, with multiple Master instances running at the same time, using ZooKeeper as the LeaderElectionAgent and storage engine, and automatically electing a new Master node if the Master in the Alive state fails. And automatically restores the status.

Event bus

When the Master starts up, it creates an event bus and registers multiple event listeners, which can be easily extended to meet different needs. The core of the event bus is an asynchronous event distribution mechanism based on blocking queues. When a new event is received, the event is dispatched to the event listener for processing. Whenever the Master receives a message that the Application state has changed, it puts the corresponding event into the event bus, so that listeners can get the task state change event in time.

RPC

Summary of the RPC

As can be seen from the introduction in the previous section, as a distributed system, there is a lot of communication between Master and Worker, and the communication between these different components is realized through RPC.

In Aloha, RPC module is different from traditional RPC framework. It does not need to use Interface Description Language (IDL) in advance to define the data structure for the communication between the client and the server and the services provided by the server. Instead, messages are identified and routed directly based on Scala’s pattern matching. This is done because RPC’s primary role here is as a bridge for communication between internal components, regardless of cross-language features. Scala-based routing based on pattern matching reduces code complexity and is easy to use.

Let’s start with a simple example to understand the basic use of RPC. The core of this is the implementation of RpcEndpoint.

//------------------------ Server side ----------------------------
object HelloWorldServer {
  def main(args: Array[String) :Unit = {
    val host = "localhost"
    val rpcEnv: RpcEnv = RpcEnv.create("hello-server", host, 52345.new AlohaConf())
    val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
    rpcEnv.setupEndpoint("hello-service", helloEndpoint)
    rpcEnv.awaitTermination()
  }
}

class HelloEndpoint(override val rpcEnv: RpcEnv) extends RpcEndpoint {
  override def onStart() :Unit = {
    println("Service started.")}override def receiveAndReply(context: RpcCallContext) :PartialFunction[Any.Unit] = {
    case SayHi(msg) =>
      context.reply(s"Aloha: $msg")
    case SayBye(msg) =>
      context.reply(s"Bye :), $msg")}override def onStop() :Unit = {
    println("Stop hello endpoint")}}case class SayHi(msg: String)

case class SayBye(msg: String)

//--------------------------- Client side -------------------------------
object HelloWorldClient {
  def main(args: Array[String) :Unit = {
    val host = "localhost"
    val rpcEnv: RpcEnv = RpcEnv.create("hello-client", host, 52345.new AlohaConf.true)
    val endPointRef: RpcEndpointRef = rpcEnv.retrieveEndpointRef(RpcAddress("localhost".52345), "hello-service")
    val future: Future[String] = endPointRef.ask[String] (SayHi("WALL-E"))
    future.onComplete {
      case Success(value) => println(s"Got response: $value")
      case Failure(e) => println(s"Got error: $e")}Await.result(future, Duration.apply("30s"))}}Copy the code

RpcEndpoint, RpcEndpointRef and RpcEnv

As you can easily see from the example above, RpcEndpoint, RpcEndpointRef, and RpcEnv are the keys to using this RPC framework. If you happen to know a little about the Actor model and the basic concepts of Akka, you can easily relate these three abstractions to the actors, ActorRef, and ActorSystem in Akka. In fact, Spark’s RPC was originally implemented based on Akka, and while Akka was stripped away, the basic design philosophy remained.

Simply put, RpcEndpoint is a service that can receive messages and respond to them. Both Master and Worker are ACTUALLY RpcEndpoint.

RpcEndpoint can respond to received messages in two ways:

  def receive: PartialFunction[Any.Unit] = {
    case_ = >throw new AlohaException(self + " does not implement 'receive'")}def receiveAndReply(context: RpcCallContext) :PartialFunction[Any.Unit] = {
    case _ => context.sendFailure(new AlohaException(self + " won't reply anything"))}Copy the code

RpcCallContext is used to respond to the sender of the message, both with a normal response and an incorrect exception. The RpcCallContext is used to decouple the business logic from the data transfer without the server knowing whether the sender of the request is local or remote.

RpcEndpoint also contains a number of life-cycle dependent callback methods, such as onStart, onStop, onError, onConnected, onDisconnected, and onNetworkError.

RpcEndpointRef is a reference to RpcEndpoint, which is the entry point through which the service caller sends the request. By obtaining the RpcEndpointRef corresponding to RpcEndpoint, you can directly send requests to RpcEndpoint. The way to send messages to RpcEndpoint is the same whether the RpcEndpoint is local or remote. That’s what RPC is all about: executing a remote service-provided method just like calling a local method.

RpcEndpointRef provides the following request sending modes:

  //Sends a one-way asynchronous message. Fire-and-forget semantics.
  def send(message: Any) :Unit

  // Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to receive the reply within the specified timeout.
  def ask[T: ClassTag](message: Any, timeout: RpcTimeout) :Future[T]

  def ask[T: ClassTag](message: Any) :Future[T] = ask(message, defaultAskTimeout)

  def askSync[T: ClassTag](message: Any) :T = askSync(message, defaultAskTimeout)

  //Send a message to the corresponding [[RpcEndpoint.receiveAndReply]] and get its result within a specified timeout, throw an exception if this fails.
  def askSync[T: ClassTag](message: Any,timeout: RpcTimeout) :T = {
    val future = ask[T](message, timeout)
    timeout.awaitResult(future)
  }
Copy the code

RpcEnv is the runtime environment of RpcEndpoint. On the one hand, it is responsible for registration of RpcEndpoint, life cycle management of RpcEndpoint, and obtaining corresponding RpcEndpointRef according to the address of RpcEndpoint. On the other hand, it is responsible for further encapsulation of requests, network transmission of underlying data, routing of messages, and so on.

RpcEnv supports two modes: Server mode and Client mode. In Server mode, RpcEndpoint can be registered with RpcEnv and a special Endpoint, RpcEndpointVerifier, is registered. When obtaining RpcEndpointRef, The RpcEndpointVerifier is used to verify the existence of the corresponding RpcEndpoint.

RpcEnv is created through the factory pattern, and the underlying implementation is replaceable. Currently, the Netty-based implementation NettyRpcEnv is used.

Dispatcher, Inbox and Outbox

In NettyRpcEnv, a design similar to mailbox is used for efficient message routing and transmission.

For each RpcEndpoint, there is an associated Inbox, and inside the Inbox there is a message list, which stores all messages received by the RpcEndpoint, including rPCMessages that need to be answered and OnewayMessages that do not need to be answered. And various life-cycle related status messages. For each message, various functions defined within RpcEndpoint are called for processing. The Dispatcher acts as a message delivery role. For all messages received by NettyRpcEnv, the Dispatcher will find the corresponding Inbox according to the specified Endpoint identifier and deliver the message to it. In addition, the Dispatcher internally started a MessageLoop, which constantly retrieved the Endpoint from the blocking queue where new messages arrived and continuously digested those new messages.

In line with the Inbox, a mapping between RpcAddress and Outbox is maintained within NettyRpcEnv, with one Outbox for each remote Endpoint. When sending messages through RpcEndpointRef, NettyRpcEnv determines the address of RpcEndpoint. If it is the local Endpoint, the message is delivered through the Dispatcher. If it is a remote Endpoint, the message is delivered to the corresponding Outbox. The Outbox also has a list of messages to be delivered, and when a message is first delivered to a remote Endpoint, a network connection is established and then the message is sent out in sequence.

Network transmission

In NettyRpcEnv, how requests are sent to and replies are received from the remote Endpoint depends on the lower-level network transport module. The network transmission module is mainly a further encapsulation of Netty. The key components and functions are as follows:

  • TransportServer: Network transmission server, whenNettyRpcEnvOne is created when you start in Server modeTransportServer, waiting for the connection request from the client
  • TransportClient: The client of network transmission, in fact, is the further encapsulation of a channel. Once the request of both sides of the network is established successfully, there is one at each end of the channelTransportClientSo that data can be exchanged in full duplex mode
  • TransportClientFactory: createTransportClientFactory class, which uses connection pooling internally and can reuse already established connections
  • RpcHandler: Processes the received RPC request messages.NettyRpcEnvIt is in this interface method that the message is handedDispatcherFor the delivery
  • RpcResponseCallback: callback interface for RPC request responses,NettyRpcEnvThe received data is deserialized based on this interface
  • TransportRequestHandler: The processing of the request message, mainly to forward the message toRpcHandlerFor processing
  • TransportResponseHandler: The response message is processed, recording each sent message and its associatedRpcResponseCallbackOnce the response is received, the corresponding callback method is invoked
  • TransportChannelHandler: is at the end of a channel pipeline, handing the message according to the message typeTransportRequestHandlerTransportResponseHandlerFor processing
  • TransportContext: used to createTransportServerTransportClientFactoryAnd initialize the Netty Channel pipeline

Other processes, such as boot server, boot client, message encoding and decoding, are the usual flow of network communication using Netty, which will not be detailed here.

summary

Aloha is a distributed scheduling framework Aloha. Its implementation mainly refers to Spark. In this paper, Aloha application scenarios and extension methods are introduced, and the top-down approach is adopted to focus on Aloha module design and implementation scheme.

Aloha is now open source on Github at github.com/jrthe42/alo… . If you have any questions about the project, please contact us through issue.

-EOF-

The original address: blog. Jrwang. Me / 2019 / aloha -… Reprint please indicate the source!