1. Brief introduction to PowerJob

PowerJob (OhMyScheduler) is a new generation of distributed task scheduling and computing framework.

PowerJob is an intermediate service used to manage scheduled and delayed tasks in multiple applications. The Map/Reduce processor can also be used for distributed task processing, which is unknown.

2. PowerJob communication

PowerJob provides an independently deployed Server to centrally manage tasks on different clients (called Worker in PowerJob). This involves the communication between the Server and the Worker. In a distributed system, the communication between Server and Client leads to the concept of service discovery.

In a general service discovery pattern, there are three roles:

  • Service provider: One side that provides a service, often providing an API for other services to use.
  • Service consumer: Uses one side of the service of the service provider, that is, the party that uses the service provider API.
  • Registry: Service providers go to a registry to register. Registration information usually includes the service name and service address. The service consumer then goes to the registry and uses the service name to get the service address so that the service consumer can communicate with the service provider.

Using service discovery to ensure high availability of distributed systems, PowerJob does not have a registry. How does it do this?

2.1 Worker Obtains the Server address

In a typical distributed system, service providers register with a registry at the outset so that service consumers can obtain information about service providers through the registry and consume their services.

The same is true for PowerJobs, where similar operations are performed when workers are initialized.

public void init(a) throws Exception {
    // Initialize Akka.// Service discovery
    currentServer = ServerDiscoveryService.discovery();
    if(StringUtils.isEmpty(currentServer) && ! config.isEnableTestMode()) {throw new RuntimeException("can't find any available server, this worker has been quarantined.");
    }
    log.info("[OhMyWorker] discovery server succeed, current server is {}.", currentServer); . }Copy the code

The above is the source code of Worker initialization. It can be seen that a service discovery operation will also be performed when the Worker is initialized. Let’s continue to view the source code.

public static String discovery(a) {

        if (IP2ADDRESS.isEmpty()) {
            OhMyWorker.getConfig().getServerAddress().forEach(x -> IP2ADDRESS.put(x.split(":") [0], x));
        }

        String result = null;

        // Make a request to the current machine
        Check whether a Server is available. If a Server is available, check whether the Server is still available
        String currentServer = OhMyWorker.getCurrentServer();
        if(! StringUtils.isEmpty(currentServer)) { String ip = currentServer.split(":") [0];
            // Directly request the HTTP service of the current Server to reduce the network overhead and reduce the burden on the Server
            String firstServerAddress = IP2ADDRESS.get(ip);
            if(firstServerAddress ! =null) { result = acquire(firstServerAddress); }}//2. If there is no available Server, check the Server in the Server array to find available Server addresses
        for (String httpServerAddress : OhMyWorker.getConfig().getServerAddress()) {
            if (StringUtils.isEmpty(result)) {
                result = acquire(httpServerAddress);
            }else {
                break; }}//3. If no available Server is found, the current Worker is disconnected from the outside world, and error handling is performed
        if (StringUtils.isEmpty(result)) {
            log.warn("[OmsServerDiscovery] can't find any available server, this worker has been quarantined.");

            // If the Server fails for multiple times, the node is disconnected from the outside world. The Server has transferred the second-level tasks to other workers, and the local tasks need to be killed
            // Error handling
            return null;
        }else {
            // Reset the number of failures
            FAILED_COUNT = 0;
            log.debug("[OmsServerDiscovery] current server is {}.", result);
            returnresult; }}Copy the code

Ohmyworker.getconfig ().getServerAddress() returns the Server address configured when the Worker is initialized.

From the above source code, the whole service discovery basically has 3 steps:

  • 1. If a Server is available, check whether the Server is still available. If yes, return the Server
  • 2. If no available Server has been found before (it may be the first time for service discovery), check the servers in the Server array (service list of initial configuration) and return the first available Server
  • 3. If no available Server is found, the current Worker is disconnected and error handling is performed

2.1.1 How do I check whether the Server is Available

Private Static String Acquire (String httpServerAddress) = private static String acquire(String httpServerAddress);

private static String acquire(String httpServerAddress) {
    String result = null;
    String url = String.format(DISCOVERY_URL, httpServerAddress, OhMyWorker.getAppId(), OhMyWorker.getCurrentServer());
    try {
        result = CommonUtils.executeWithRetry0(() -> HttpUtils.get(url));
    }catch (Exception ignore) {
    }
    if(! StringUtils.isEmpty(result)) {try {
            ResultDTO resultDTO = JsonUtils.parseObject(result, ResultDTO.class);
            if (resultDTO.isSuccess()) {
                returnresultDTO.getData().toString(); }}catch (Exception ignore) {
        }
    }
    return null;
}
Copy the code

The above code is very simple. It is to initiate an Http request to the Server address that needs to be verified. The request parameter has the AppId corresponding to the Worker. Determine the data returned by the request, and if the Server returns as expected, the Server is available.

The value of resultdto.getData () is the address of the Server and the port number of Akka in the Server.

Let’s look at how the Server handles this request.

public String getServer(Long appId) {

        Set<String> downServerCache = Sets.newHashSet();

        for (int i = 0; i < RETRY_TIMES; i++) {

            // Get Server from current database without lock
            //1. Obtain the application information from the database. The information contains the Server address bound to the application
            Optional<AppInfoDO> appInfoOpt = appInfoRepository.findById(appId);
            if(! appInfoOpt.isPresent()) {throw new OmsException(appId + " is not registered!");
            }
            String appName = appInfoOpt.get().getAppName();
            String originServer = appInfoOpt.get().getCurrentServer();
            // Check whether the Server bound to the current application is alive, if so, return this Server information
            if (isActive(originServer, downServerCache)) {
                return originServer;
            }

            //2. If the Server is not available, try to change the Server bound to the application to the local Server
            // There is no Server available
            String lockName = String.format(SERVER_ELECT_LOCK, appId);
            boolean lockStatus = lockService.lock(lockName, 30000);
            if(! lockStatus) {try {
                    Thread.sleep(500);
                }catch (Exception ignore) {
                }
                continue;
            }
            try {

                // It is possible that the Server election has been completed on the previous machine
                AppInfoDO appInfo = appInfoRepository.findById(appId).orElseThrow(() -> new RuntimeException("impossible, unless we just lost our database."));
                if (isActive(appInfo.getCurrentServer(), downServerCache)) {
                    return appInfo.getCurrentServer();
                }

                // Usurp this machine as the Server
                appInfo.setCurrentServer(OhMyServer.getActorSystemAddress());
                appInfo.setGmtModified(new Date());

                appInfoRepository.saveAndFlush(appInfo);
                log.info("[ServerSelectService] this server({}) become the new server for app(appId={}).", appInfo.getCurrentServer(), appId);
                return appInfo.getCurrentServer();
            }catch (Exception e) {
                log.warn("[ServerSelectService] write new server to db failed for app {}.", appName);
            }finally{ lockService.unlock(lockName); }}throw new RuntimeException("server elect failed for app " + appId);
    }
Copy the code

The processing operation mainly consists of the following steps:

  • 1. Use the request parameter appId to obtain application information, verify whether the Server in the application information is available, and directly return the Server information if it is available
  • 2, If the Server is not available, execute the Server election. In fact, under the control of distributed locks, each Server sets itself as the binding Server of the application. Finally, the selected Server is returned.

Returning to the title of this section, if a Server is judged to be available.

private boolean isActive(String serverAddress, Set<String> downServerCache) {... ActorSelection serverActor = OhMyServer.getFriendActor(serverAddress);try {
            CompletionStage<Object> askCS = Patterns.ask(serverActor, ping, Duration.ofMillis(PING_TIMEOUT_MS));
            AskResponse response = (AskResponse) askCS.toCompletableFuture().get(PING_TIMEOUT_MS, TimeUnit.MILLISECONDS);
            downServerCache.remove(serverAddress);
            return response.isSuccess();
        }catch (Exception e) {
            log.warn("[ServerSelectService] server({}) was down.", serverAddress);
        }
        downServerCache.add(serverAddress);
        return false;
    }
Copy the code

In fact, it is very simple, is the Server in the Akka service to send a Ping signal, can get the correct response, indicating that the Server is available.

So far, we have sorted out the Server address information bound to the corresponding App by Worker. This allows the Worker to report to the Server something about itself, such as using the heartbeat to stay connected to the Server.

3. The Server obtains the Woker address

Above, the Worker gets the corresponding Server address. Below, the Server gets the information of the Worker managed by itself.

After the Worker obtains the Server address, it periodically sends heartbeat information to the Server using Akka. The heartbeat information contains the local address (IP:port). This port is the local Akka service port. AppName and appId corresponding to the native Worker; The resource usage of the local system.

Let’s take a look at what the Server does after receiving the heartbeat message.

/** * Update status *@paramThe heartbeat Worker's heartbeat */
public static void updateStatus(WorkerHeartbeat heartbeat) {
    Long appId = heartbeat.getAppId();
    String appName = heartbeat.getAppName();
    ClusterStatusHolder clusterStatusHolder = appId2ClusterStatus.computeIfAbsent(appId, ignore -> new ClusterStatusHolder(appName));
    clusterStatusHolder.updateStatus(heartbeat);
}
Copy the code

It mainly updates the status of workers. ClusterStatusHolder is a class that manages the same Worker cluster and maintains a Map internally to store the status of each machine in the cluster.

Next to clusterStatusHolder. The updateStatus (heartbeat) inside a look.

public void updateStatus(WorkerHeartbeat heartbeat) {

        String workerAddress = heartbeat.getWorkerAddress();
        long heartbeatTime = heartbeat.getHeartbeatTime();

        Long oldTime = address2ActiveTime.getOrDefault(workerAddress, -1L);
        if (heartbeatTime < oldTime) {
            log.warn("[ClusterStatusHolder-{}] receive the expired heartbeat from {}, serverTime: {}, heartTime: {}", appName, heartbeat.getWorkerAddress(), System.currentTimeMillis(), heartbeat.getHeartbeatTime());
            return;
        }

        address2ActiveTime.put(workerAddress, heartbeatTime);
        address2Metrics.put(workerAddress, heartbeat.getSystemMetrics());

        List<DeployedContainerInfo> containerInfos = heartbeat.getContainerInfos();
        if (!CollectionUtils.isEmpty(containerInfos)) {
            containerInfos.forEach(containerInfo -> {
                Map<String, DeployedContainerInfo> infos = containerId2Infos.computeIfAbsent(containerInfo.getContainerId(), ignore -> Maps.newConcurrentMap());
                infos.put(workerAddress, containerInfo);
            });
        }
    }
Copy the code

It is based on the heartbeat time and resource usage of the Worker machine.

Although this code is relatively simple, the heartbeat Server can get the address of the Worker through this address, and then the Server can use this address to send the task execution information to the Worker.

4, summarize

From the above analysis, Worker is the service provider in distributed system, Server is the service consumer, and the consumption content is the scheduled task of Worker managed by Server.

We also talked about a registry at the beginning. What registry is in PowerJob? It is also a Server. The Worker registers appName and APP address in the Server by sending heartbeat information.

The concept mentioned above is the Server bound to Worker, that is, the task management of a Worker is completed by a Server, no matter how many Server instances there are in this distributed system. The advantage of this is that tasks are grouped and isolated. I am a little confused about the benefits of grouping and isolating tasks.