sequence
This paper mainly studies flink JobManagerGateway
RestfulGateway
Flink – 1.7.2 / flink – the runtime/SRC/main/Java/org/apache/flink/runtime/webmonitor RestfulGateway. Java
public interface RestfulGateway extends RpcGateway {
CompletableFuture<Acknowledge> cancelJob(JobID jobId, @RpcTimeout Time timeout);
CompletableFuture<Acknowledge> stopJob(JobID jobId, @RpcTimeout Time timeout);
CompletableFuture<String> requestRestAddress(@RpcTimeout Time timeout);
CompletableFuture<? extends AccessExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout);
CompletableFuture<JobResult> requestJobResult(JobID jobId, @RpcTimeout Time timeout);
CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(
@RpcTimeout Time timeout);
CompletableFuture<ClusterOverview> requestClusterOverview(@RpcTimeout Time timeout);
CompletableFuture<Collection<String>> requestMetricQueryServicePaths(@RpcTimeout Time timeout);
CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);
default CompletableFuture<String> triggerSavepoint(
JobID jobId,
String targetDirectory,
boolean cancelJob,
@RpcTimeout Time timeout) {
throw new UnsupportedOperationException();
}
default CompletableFuture<Acknowledge> disposeSavepoint(
final String savepointPath,
@RpcTimeout final Time timeout) {
throw new UnsupportedOperationException();
}
default CompletableFuture<JobStatus> requestJobStatus(
JobID jobId,
@RpcTimeout Time timeout) {
throw new UnsupportedOperationException();
}
default CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(
JobID jobId,
JobVertexID jobVertexId) {
throw new UnsupportedOperationException();
}
default CompletableFuture<Acknowledge> rescaleJob(
JobID jobId,
int newParallelism,
RescalingBehaviour rescalingBehaviour,
@RpcTimeout Time timeout) {
throw new UnsupportedOperationException();
}
default CompletableFuture<Acknowledge> shutDownCluster() { throw new UnsupportedOperationException(); }}Copy the code
- The RestfulGateway interface inherits the RpcGateway interface, It defines the cancelJob, stopJob, requestRestAddress, requestJob, requestJobResult, requestMultipleJobDetails, requestClusterOverview, re QuestMetricQueryServicePaths, requestTaskManagerMetricQueryServicePaths method; Also provides triggerSavepoint, disposeSavepoint, requestJobStatus, requestOperatorBackPressureStats, rescaleJob, shutDownCluster these default Methods, its implementation all throw UnsupportedOperationException anomalies
JobManagerGateway
Flink – 1.7.2 / flink – the runtime/SRC/main/Java/org/apache/flink/runtime/jobmaster JobManagerGateway. Java
public interface JobManagerGateway extends RestfulGateway {
CompletableFuture<Integer> requestBlobServerPort(Time timeout);
CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout);
CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, @Nullable String savepointPath, Time timeout);
CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout);
CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout);
CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout);
CompletableFuture<Optional<Instance>> requestTaskManagerInstance(ResourceID resourceId, Time timeout);
CompletableFuture<Collection<Instance>> requestTaskManagerInstances(Time timeout);
CompletableFuture<JobIdsWithStatusOverview> requestJobsOverview(Time timeout);
}
Copy the code
- The JobManagerGateway interface inherits the RestfulGateway interface. It defines requestBlobServerPort, submitJob, cancelJobWithSavepoint, cancelJob, stopJob, requestClassloadingProps, and requestTaskManager The Instance, requestTaskManagerInstances, requestJobsOverview method; It has an implementation class called AkkaJobManagerGateway
AkkaJobManagerGateway
Flink – 1.7.2 / flink – the runtime/SRC/main/Java/org/apache/flink/runtime/akka AkkaJobManagerGateway. Java
public class AkkaJobManagerGateway implements JobManagerGateway {
private final ActorGateway jobManagerGateway;
private final String hostname;
public AkkaJobManagerGateway(ActorGateway jobManagerGateway) {
this.jobManagerGateway = Preconditions.checkNotNull(jobManagerGateway);
final Option<String> optHostname = jobManagerGateway.actor().path().address().host();
hostname = optHostname.isDefined() ? optHostname.get() : "localhost";
}
@Override
public String getAddress() {
return jobManagerGateway.path();
}
@Override
public String getHostname() {
return hostname;
}
@Override
public CompletableFuture<Integer> requestBlobServerPort(Time timeout) {
return FutureUtils.toJava(
jobManagerGateway
.ask(JobManagerMessages.getRequestBlobManagerPort(), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.apply(Integer.class)));
}
//--------------------------------------------------------------------------------
// Job control
//--------------------------------------------------------------------------------
@Override
public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout) {
return FutureUtils
.toJava(
jobManagerGateway.ask(
new JobManagerMessages.SubmitJob(
jobGraph,
listeningBehaviour),
FutureUtils.toFiniteDuration(timeout)))
.thenApply(
(Object response) -> {
if (response instanceof JobManagerMessages.JobSubmitSuccess) {
JobManagerMessages.JobSubmitSuccess success = ((JobManagerMessages.JobSubmitSuccess) response);
if (Objects.equals(success.jobId(), jobGraph.getJobID())) {
return Acknowledge.get();
} else {
throw new CompletionException(new FlinkException("JobManager responded for wrong Job. This Job: " +
jobGraph.getJobID() + ", response: "+ success.jobId())); }}else if (response instanceof JobManagerMessages.JobResultFailure) {
JobManagerMessages.JobResultFailure failure = ((JobManagerMessages.JobResultFailure) response);
throw new CompletionException(new FlinkException("Job submission failed.", failure.cause()));
} else {
throw new CompletionException(new FlinkException("Unknown response to SubmitJob message: " + response + '. ')); }}); } @Override public CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, String savepointPath, Time timeout) { CompletableFuture<JobManagerMessages.CancellationResponse> cancellationFuture = FutureUtils.toJava( jobManagerGateway .ask(new JobManagerMessages.CancelJobWithSavepoint(jobId, savepointPath), FutureUtils.toFiniteDuration(timeout)) .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class)));return cancellationFuture.thenApply(
(JobManagerMessages.CancellationResponse response) -> {
if (response instanceof JobManagerMessages.CancellationSuccess) {
return ((JobManagerMessages.CancellationSuccess) response).savepointPath();
} else {
throw new CompletionException(new FlinkException("Cancel with savepoint failed.", ((JobManagerMessages.CancellationFailure) response).cause())); }}); } @Override public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) { CompletableFuture<JobManagerMessages.CancellationResponse> responseFuture = FutureUtils.toJava( jobManagerGateway .ask(new JobManagerMessages.CancelJob(jobId), FutureUtils.toFiniteDuration(timeout)) .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class)));return responseFuture.thenApply(
(JobManagerMessages.CancellationResponse response) -> {
if (response instanceof JobManagerMessages.CancellationSuccess) {
return Acknowledge.get();
} else {
throw new CompletionException(new FlinkException("Cancel job failed " + jobId + '. ', ((JobManagerMessages.CancellationFailure) response).cause())); }}); } @Override public CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout) { CompletableFuture<JobManagerMessages.StoppingResponse> responseFuture = FutureUtils.toJava( jobManagerGateway .ask(new JobManagerMessages.StopJob(jobId), FutureUtils.toFiniteDuration(timeout)) .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.StoppingResponse.class)));return responseFuture.thenApply(
(JobManagerMessages.StoppingResponse response) -> {
if (response instanceof JobManagerMessages.StoppingSuccess) {
return Acknowledge.get();
} else {
throw new CompletionException(new FlinkException("Stop job failed " + jobId + '. ', ((JobManagerMessages.StoppingFailure) response).cause())); }}); } //-------------------------------------------------------------------------------- // JobManager information //-------------------------------------------------------------------------------- @Override public CompletableFuture<Optional<Instance>> requestTaskManagerInstance(ResourceID resourceId, Time timeout) {return FutureUtils.toJava(
jobManagerGateway
.ask(new JobManagerMessages.RequestTaskManagerInstance(resourceId), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.<JobManagerMessages.TaskManagerInstance>apply(JobManagerMessages.TaskManagerInstance.class)))
.thenApply(
(JobManagerMessages.TaskManagerInstance taskManagerResponse) -> {
if (taskManagerResponse.instance().isDefined()) {
return Optional.of(taskManagerResponse.instance().get());
} else {
returnOptional.empty(); }}); } @Override public CompletableFuture<Collection<Instance>> requestTaskManagerInstances(Time timeout) { CompletableFuture<JobManagerMessages.RegisteredTaskManagers> taskManagersFuture = FutureUtils.toJava( jobManagerGateway .ask(JobManagerMessages.getRequestRegisteredTaskManagers(), FutureUtils.toFiniteDuration(timeout)) .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.RegisteredTaskManagers.class)));return taskManagersFuture.thenApply(
JobManagerMessages.RegisteredTaskManagers::asJavaCollection);
}
@Override
public CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout) {
return FutureUtils
.toJava(jobManagerGateway
.ask(
new JobManagerMessages.RequestClassloadingProps(jobId),
FutureUtils.toFiniteDuration(timeout)))
.thenApply(
(Object response) -> {
if (response instanceof JobManagerMessages.ClassloadingProps) {
return Optional.of(((JobManagerMessages.ClassloadingProps) response));
} else if (response instanceof JobManagerMessages.JobNotFound) {
return Optional.empty();
} else {
throw new CompletionException(new FlinkException("Unknown response: " + response + '. ')); }}); } @Override public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time timeout) {return FutureUtils.toJava(
jobManagerGateway
.ask(new RequestJobDetails(true.true), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.apply(MultipleJobsDetails.class)));
}
@Override
public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, Time timeout) {
CompletableFuture<JobManagerMessages.JobResponse> jobResponseFuture = FutureUtils.toJava(
jobManagerGateway
.ask(new JobManagerMessages.RequestJob(jobId), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobResponse.class)));
return jobResponseFuture.thenApply(
(JobManagerMessages.JobResponse jobResponse) -> {
if (jobResponse instanceof JobManagerMessages.JobFound) {
return ((JobManagerMessages.JobFound) jobResponse).executionGraph();
} else{ throw new CompletionException(new FlinkJobNotFoundException(jobId)); }}); } @Override public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {return requestJob(jobId, timeout).thenApply(JobResult::createFrom);
}
@Override
public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
return FutureUtils.toJava(
jobManagerGateway
.ask(RequestStatusOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.apply(ClusterOverview.class)));
}
@Override
public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) {
final String jobManagerPath = getAddress();
final String jobManagerMetricQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;
return CompletableFuture.completedFuture(
Collections.singleton(jobManagerMetricQueryServicePath));
}
@Override
public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
returnrequestTaskManagerInstances(timeout) .thenApply( (Collection<Instance> instances) -> instances .stream() .map( (Instance instance) -> { final String taskManagerAddress = instance.getTaskManagerGateway().getAddress(); final String taskManagerMetricQuerServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + instance.getTaskManagerID().getResourceIdString();
return Tuple2.of(instance.getTaskManagerID(), taskManagerMetricQuerServicePath);
})
.collect(Collectors.toList()));
}
@Override
public CompletableFuture<JobIdsWithStatusOverview> requestJobsOverview(Time timeout) {
return FutureUtils.toJava(
jobManagerGateway
.ask(RequestJobsWithIDsOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
.mapTo(ClassTag$.MODULE$.apply(JobIdsWithStatusOverview.class)));
}
@Override
public CompletableFuture<String> requestRestAddress(Time timeout) {
returnFutureUtils.toJava( jobManagerGateway .ask(JobManagerMessages.getRequestRestAddress(), FutureUtils.toFiniteDuration(timeout)) .mapTo(ClassTag$.MODULE$.apply(String.class))); }}Copy the code
- The constructor of AkkaJobManagerGateway, which implements the JobManagerGateway interface, requires that JobManagerGateway be passed in
- The requestBlobServerPort method passes the RequestBlobManagerPort message; The submitJob method passes the submitJob message. The cancelJobWithSavepoint method delivers the cancelJobWithSavepoint message. CancelJob method transmits cancelJob message. The stopJob method delivers a stopJob message
- RequestTaskManagerInstance method is passed requestTaskManagerInstance message; RequestTaskManagerInstances method is passed RequestRegisteredTaskManagers message; The requestClassloadingProps method passes the requestClassloadingProps message. RequestMultipleJobDetails method is passed RequestJobDetails message; The requestJob method sends a requestJob message. The requestClusterOverview method passes the RequestStatusOverview message. RequestJobsOverview method is passed RequestJobsWithIDsOverview message; The requestRestAddress method passes the requestRestAddress message
summary
- The RestfulGateway interface inherits the RpcGateway interface, It defines the cancelJob, stopJob, requestRestAddress, requestJob, requestJobResult, requestMultipleJobDetails, requestClusterOverview, re QuestMetricQueryServicePaths, requestTaskManagerMetricQueryServicePaths method; Also provides triggerSavepoint, disposeSavepoint, requestJobStatus, requestOperatorBackPressureStats, rescaleJob, shutDownCluster these default Methods, its implementation all throw UnsupportedOperationException anomalies
- The JobManagerGateway interface inherits the RestfulGateway interface. It defines requestBlobServerPort, submitJob, cancelJobWithSavepoint, cancelJob, stopJob, requestClassloadingProps, and requestTaskManager The Instance, requestTaskManagerInstances, requestJobsOverview method; It has an implementation class called AkkaJobManagerGateway
- AkkaJobManagerGateway implements the JobManagerGateway interface, and its constructor requires passing in JobManagerGateway; Most of its methods are implemented to deliver messages through jobManagerGateway, which makes heavy use of the message objects defined in JobManagerMessages
doc
- JobManagerGateway