sequence

This paper mainly studies flink JobManagerGateway

RestfulGateway

Flink – 1.7.2 / flink – the runtime/SRC/main/Java/org/apache/flink/runtime/webmonitor RestfulGateway. Java

public interface RestfulGateway extends RpcGateway {

	CompletableFuture<Acknowledge> cancelJob(JobID jobId, @RpcTimeout Time timeout);

	CompletableFuture<Acknowledge> stopJob(JobID jobId, @RpcTimeout Time timeout);

	CompletableFuture<String> requestRestAddress(@RpcTimeout  Time timeout);

	CompletableFuture<? extends AccessExecutionGraph> requestJob(JobID jobId, @RpcTimeout Time timeout);

	CompletableFuture<JobResult> requestJobResult(JobID jobId, @RpcTimeout Time timeout);

	CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(
		@RpcTimeout Time timeout);

	CompletableFuture<ClusterOverview> requestClusterOverview(@RpcTimeout Time timeout);

	CompletableFuture<Collection<String>> requestMetricQueryServicePaths(@RpcTimeout Time timeout);

	CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(@RpcTimeout Time timeout);

	default CompletableFuture<String> triggerSavepoint(
			JobID jobId,
			String targetDirectory,
			boolean cancelJob,
			@RpcTimeout Time timeout) {
		throw new UnsupportedOperationException();
	}

	default CompletableFuture<Acknowledge> disposeSavepoint(
			final String savepointPath,
			@RpcTimeout final Time timeout) {
		throw new UnsupportedOperationException();
	}

	default CompletableFuture<JobStatus> requestJobStatus(
			JobID jobId,
			@RpcTimeout Time timeout) {
		throw new UnsupportedOperationException();
	}

	default CompletableFuture<OperatorBackPressureStatsResponse> requestOperatorBackPressureStats(
			JobID jobId,
			JobVertexID jobVertexId) {
		throw new UnsupportedOperationException();
	}

	default CompletableFuture<Acknowledge> rescaleJob(
			JobID jobId,
			int newParallelism,
			RescalingBehaviour rescalingBehaviour,
			@RpcTimeout Time timeout) {
		throw new UnsupportedOperationException();
	}

	default CompletableFuture<Acknowledge> shutDownCluster() { throw new UnsupportedOperationException(); }}Copy the code
  • The RestfulGateway interface inherits the RpcGateway interface, It defines the cancelJob, stopJob, requestRestAddress, requestJob, requestJobResult, requestMultipleJobDetails, requestClusterOverview, re QuestMetricQueryServicePaths, requestTaskManagerMetricQueryServicePaths method; Also provides triggerSavepoint, disposeSavepoint, requestJobStatus, requestOperatorBackPressureStats, rescaleJob, shutDownCluster these default Methods, its implementation all throw UnsupportedOperationException anomalies

JobManagerGateway

Flink – 1.7.2 / flink – the runtime/SRC/main/Java/org/apache/flink/runtime/jobmaster JobManagerGateway. Java

public interface JobManagerGateway extends RestfulGateway {

	CompletableFuture<Integer> requestBlobServerPort(Time timeout);

	CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout);

	CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, @Nullable String savepointPath, Time timeout);

	CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout);

	CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout);

	CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout);

	CompletableFuture<Optional<Instance>> requestTaskManagerInstance(ResourceID resourceId, Time timeout);

	CompletableFuture<Collection<Instance>> requestTaskManagerInstances(Time timeout);

	CompletableFuture<JobIdsWithStatusOverview> requestJobsOverview(Time timeout);
}
Copy the code
  • The JobManagerGateway interface inherits the RestfulGateway interface. It defines requestBlobServerPort, submitJob, cancelJobWithSavepoint, cancelJob, stopJob, requestClassloadingProps, and requestTaskManager The Instance, requestTaskManagerInstances, requestJobsOverview method; It has an implementation class called AkkaJobManagerGateway

AkkaJobManagerGateway

Flink – 1.7.2 / flink – the runtime/SRC/main/Java/org/apache/flink/runtime/akka AkkaJobManagerGateway. Java

public class AkkaJobManagerGateway implements JobManagerGateway {

	private final ActorGateway jobManagerGateway;
	private final String hostname;

	public AkkaJobManagerGateway(ActorGateway jobManagerGateway) {
		this.jobManagerGateway = Preconditions.checkNotNull(jobManagerGateway);

		final Option<String> optHostname = jobManagerGateway.actor().path().address().host();
		hostname = optHostname.isDefined() ? optHostname.get() : "localhost";
	}

	@Override
	public String getAddress() {
		return jobManagerGateway.path();
	}

	@Override
	public String getHostname() {
		return hostname;
	}

	@Override
	public CompletableFuture<Integer> requestBlobServerPort(Time timeout) {
		return FutureUtils.toJava(
			jobManagerGateway
				.ask(JobManagerMessages.getRequestBlobManagerPort(), FutureUtils.toFiniteDuration(timeout))
				.mapTo(ClassTag$.MODULE$.apply(Integer.class)));
	}

	//--------------------------------------------------------------------------------
	// Job control
	//--------------------------------------------------------------------------------

	@Override
	public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, ListeningBehaviour listeningBehaviour, Time timeout) {
		return FutureUtils
			.toJava(
				jobManagerGateway.ask(
					new JobManagerMessages.SubmitJob(
						jobGraph,
						listeningBehaviour),
					FutureUtils.toFiniteDuration(timeout)))
			.thenApply(
				(Object response) -> {
					if (response instanceof JobManagerMessages.JobSubmitSuccess) {
						JobManagerMessages.JobSubmitSuccess success = ((JobManagerMessages.JobSubmitSuccess) response);

						if (Objects.equals(success.jobId(), jobGraph.getJobID())) {
							return Acknowledge.get();
						} else {
							throw new CompletionException(new FlinkException("JobManager responded for wrong Job. This Job: " +
								jobGraph.getJobID() + ", response: "+ success.jobId())); }}else if (response instanceof JobManagerMessages.JobResultFailure) {
						JobManagerMessages.JobResultFailure failure = ((JobManagerMessages.JobResultFailure) response);

						throw new CompletionException(new FlinkException("Job submission failed.", failure.cause()));
					} else {
						throw new CompletionException(new FlinkException("Unknown response to SubmitJob message: " + response + '. ')); }}); } @Override public CompletableFuture<String> cancelJobWithSavepoint(JobID jobId, String savepointPath, Time timeout) { CompletableFuture<JobManagerMessages.CancellationResponse> cancellationFuture = FutureUtils.toJava( jobManagerGateway .ask(new JobManagerMessages.CancelJobWithSavepoint(jobId, savepointPath), FutureUtils.toFiniteDuration(timeout)) .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class)));return cancellationFuture.thenApply(
			(JobManagerMessages.CancellationResponse response) -> {
				if (response instanceof JobManagerMessages.CancellationSuccess) {
					return ((JobManagerMessages.CancellationSuccess) response).savepointPath();
				} else {
					throw new CompletionException(new FlinkException("Cancel with savepoint failed.", ((JobManagerMessages.CancellationFailure) response).cause())); }}); } @Override public CompletableFuture<Acknowledge> cancelJob(JobID jobId, Time timeout) { CompletableFuture<JobManagerMessages.CancellationResponse> responseFuture = FutureUtils.toJava( jobManagerGateway .ask(new JobManagerMessages.CancelJob(jobId), FutureUtils.toFiniteDuration(timeout)) .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.CancellationResponse.class)));return responseFuture.thenApply(
			(JobManagerMessages.CancellationResponse response) -> {
				if (response instanceof JobManagerMessages.CancellationSuccess) {
					return Acknowledge.get();
				} else {
					throw new CompletionException(new FlinkException("Cancel job failed " + jobId + '. ', ((JobManagerMessages.CancellationFailure) response).cause())); }}); } @Override public CompletableFuture<Acknowledge> stopJob(JobID jobId, Time timeout) { CompletableFuture<JobManagerMessages.StoppingResponse> responseFuture = FutureUtils.toJava( jobManagerGateway .ask(new JobManagerMessages.StopJob(jobId), FutureUtils.toFiniteDuration(timeout)) .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.StoppingResponse.class)));return responseFuture.thenApply(
			(JobManagerMessages.StoppingResponse response) -> {
				if (response instanceof JobManagerMessages.StoppingSuccess) {
					return Acknowledge.get();
				} else {
					throw new CompletionException(new FlinkException("Stop job failed " + jobId + '. ', ((JobManagerMessages.StoppingFailure) response).cause())); }}); } //-------------------------------------------------------------------------------- // JobManager information //-------------------------------------------------------------------------------- @Override public CompletableFuture<Optional<Instance>> requestTaskManagerInstance(ResourceID resourceId, Time timeout) {return FutureUtils.toJava(
			jobManagerGateway
				.ask(new JobManagerMessages.RequestTaskManagerInstance(resourceId), FutureUtils.toFiniteDuration(timeout))
				.mapTo(ClassTag$.MODULE$.<JobManagerMessages.TaskManagerInstance>apply(JobManagerMessages.TaskManagerInstance.class)))
			.thenApply(
				(JobManagerMessages.TaskManagerInstance taskManagerResponse) -> {
					if (taskManagerResponse.instance().isDefined()) {
						return Optional.of(taskManagerResponse.instance().get());
					} else {
						returnOptional.empty(); }}); } @Override public CompletableFuture<Collection<Instance>> requestTaskManagerInstances(Time timeout) { CompletableFuture<JobManagerMessages.RegisteredTaskManagers> taskManagersFuture = FutureUtils.toJava( jobManagerGateway .ask(JobManagerMessages.getRequestRegisteredTaskManagers(), FutureUtils.toFiniteDuration(timeout)) .mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.RegisteredTaskManagers.class)));return taskManagersFuture.thenApply(
			JobManagerMessages.RegisteredTaskManagers::asJavaCollection);
	}

	@Override
	public CompletableFuture<Optional<JobManagerMessages.ClassloadingProps>> requestClassloadingProps(JobID jobId, Time timeout) {
		return FutureUtils
			.toJava(jobManagerGateway
				.ask(
					new JobManagerMessages.RequestClassloadingProps(jobId),
					FutureUtils.toFiniteDuration(timeout)))
			.thenApply(
				(Object response) -> {
					if (response instanceof JobManagerMessages.ClassloadingProps) {
						return Optional.of(((JobManagerMessages.ClassloadingProps) response));
					} else if (response instanceof JobManagerMessages.JobNotFound) {
						return Optional.empty();
					} else {
						throw new CompletionException(new FlinkException("Unknown response: " + response + '. ')); }}); } @Override public CompletableFuture<MultipleJobsDetails> requestMultipleJobDetails(Time timeout) {return FutureUtils.toJava(
			jobManagerGateway
				.ask(new RequestJobDetails(true.true), FutureUtils.toFiniteDuration(timeout))
				.mapTo(ClassTag$.MODULE$.apply(MultipleJobsDetails.class)));
	}

	@Override
	public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, Time timeout) {
		CompletableFuture<JobManagerMessages.JobResponse> jobResponseFuture = FutureUtils.toJava(
			jobManagerGateway
				.ask(new JobManagerMessages.RequestJob(jobId), FutureUtils.toFiniteDuration(timeout))
				.mapTo(ClassTag$.MODULE$.apply(JobManagerMessages.JobResponse.class)));

		return jobResponseFuture.thenApply(
			(JobManagerMessages.JobResponse jobResponse) -> {
				if (jobResponse instanceof JobManagerMessages.JobFound) {
					return ((JobManagerMessages.JobFound) jobResponse).executionGraph();
				} else{ throw new CompletionException(new FlinkJobNotFoundException(jobId)); }}); } @Override public CompletableFuture<JobResult> requestJobResult(JobID jobId, Time timeout) {return requestJob(jobId, timeout).thenApply(JobResult::createFrom);
	}

	@Override
	public CompletableFuture<ClusterOverview> requestClusterOverview(Time timeout) {
		return FutureUtils.toJava(
			jobManagerGateway
				.ask(RequestStatusOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
				.mapTo(ClassTag$.MODULE$.apply(ClusterOverview.class)));
	}

	@Override
	public CompletableFuture<Collection<String>> requestMetricQueryServicePaths(Time timeout) {
		final String jobManagerPath = getAddress();
		final String jobManagerMetricQueryServicePath = jobManagerPath.substring(0, jobManagerPath.lastIndexOf('/') + 1) + MetricQueryService.METRIC_QUERY_SERVICE_NAME;

		return CompletableFuture.completedFuture(
			Collections.singleton(jobManagerMetricQueryServicePath));
	}

	@Override
	public CompletableFuture<Collection<Tuple2<ResourceID, String>>> requestTaskManagerMetricQueryServicePaths(Time timeout) {
		returnrequestTaskManagerInstances(timeout) .thenApply( (Collection<Instance> instances) -> instances .stream() .map( (Instance  instance) -> { final String taskManagerAddress = instance.getTaskManagerGateway().getAddress(); final String taskManagerMetricQuerServicePath = taskManagerAddress.substring(0, taskManagerAddress.lastIndexOf('/') + 1) +
									MetricQueryService.METRIC_QUERY_SERVICE_NAME + '_' + instance.getTaskManagerID().getResourceIdString();

								return Tuple2.of(instance.getTaskManagerID(), taskManagerMetricQuerServicePath);
							})
						.collect(Collectors.toList()));
	}

	@Override
	public CompletableFuture<JobIdsWithStatusOverview> requestJobsOverview(Time timeout) {
		return FutureUtils.toJava(
			jobManagerGateway
				.ask(RequestJobsWithIDsOverview.getInstance(), FutureUtils.toFiniteDuration(timeout))
				.mapTo(ClassTag$.MODULE$.apply(JobIdsWithStatusOverview.class)));
	}

	@Override
	public CompletableFuture<String> requestRestAddress(Time timeout) {
		returnFutureUtils.toJava( jobManagerGateway .ask(JobManagerMessages.getRequestRestAddress(), FutureUtils.toFiniteDuration(timeout)) .mapTo(ClassTag$.MODULE$.apply(String.class))); }}Copy the code
  • The constructor of AkkaJobManagerGateway, which implements the JobManagerGateway interface, requires that JobManagerGateway be passed in
  • The requestBlobServerPort method passes the RequestBlobManagerPort message; The submitJob method passes the submitJob message. The cancelJobWithSavepoint method delivers the cancelJobWithSavepoint message. CancelJob method transmits cancelJob message. The stopJob method delivers a stopJob message
  • RequestTaskManagerInstance method is passed requestTaskManagerInstance message; RequestTaskManagerInstances method is passed RequestRegisteredTaskManagers message; The requestClassloadingProps method passes the requestClassloadingProps message. RequestMultipleJobDetails method is passed RequestJobDetails message; The requestJob method sends a requestJob message. The requestClusterOverview method passes the RequestStatusOverview message. RequestJobsOverview method is passed RequestJobsWithIDsOverview message; The requestRestAddress method passes the requestRestAddress message

summary

  • The RestfulGateway interface inherits the RpcGateway interface, It defines the cancelJob, stopJob, requestRestAddress, requestJob, requestJobResult, requestMultipleJobDetails, requestClusterOverview, re QuestMetricQueryServicePaths, requestTaskManagerMetricQueryServicePaths method; Also provides triggerSavepoint, disposeSavepoint, requestJobStatus, requestOperatorBackPressureStats, rescaleJob, shutDownCluster these default Methods, its implementation all throw UnsupportedOperationException anomalies
  • The JobManagerGateway interface inherits the RestfulGateway interface. It defines requestBlobServerPort, submitJob, cancelJobWithSavepoint, cancelJob, stopJob, requestClassloadingProps, and requestTaskManager The Instance, requestTaskManagerInstances, requestJobsOverview method; It has an implementation class called AkkaJobManagerGateway
  • AkkaJobManagerGateway implements the JobManagerGateway interface, and its constructor requires passing in JobManagerGateway; Most of its methods are implemented to deliver messages through jobManagerGateway, which makes heavy use of the message objects defined in JobManagerMessages

doc

  • JobManagerGateway