Enmm first of all, I don’t expect anyone to agree with this, after all, I am also looking at other people’s documents and code. This series of blogs can be regarded as a feeling after reading it

Flink1.14’s generalized incremental checkpoints: github.com/apache/flin…

The proposal to monitor back pressure was put forward by Roman, the sponsor of Flink’s German parent company

Details of the tasks performed at the bottom of the TaskManager

TaskExecutor refers to a TaskExecutor that executes a set of tasks within each TaskManager. You can run this test class in the Flink source code

org/apache/flink/runtime/taskexecutor/TaskExecutorITCase#
testJobRecoveryWithFailingTaskExecutor
Copy the code

As you can see, the startTaskManager function call will start with a MiniCluster (minimum cluster resource) from the local executor.

@Test
public void testJobRecoveryWithFailingTaskExecutor() throws Exception {
    final JobGraph jobGraph = createJobGraphWithRestartStrategy(PARALLELISM);
    final CompletableFuture<JobResult> jobResultFuture = submitJobAndWaitUntilRunning(jobGraph);

    // start an additional TaskExecutor
    miniCluster.startTaskManager();

    miniCluster.terminateTaskManager(0).get(); // this should fail the job

    BlockingOperator.unblock();

    assertThat(jobResultFuture.get().isSuccess(), is(true));
}

Copy the code

MiniCluster#startTaskManager can see that a reentrant lock is maintained globally (available to the TaskExecutor assigned each time the cluster starts). Here we can see the TaskManagerRunner in the above flowchart, whose role is to assign a random UUID to each submitted Task, And returns the TaskExecutor object to which the resource belongs (taskexecutor.start () on the last line).

public void startTaskManager() throws Exception { synchronized (lock) { final Configuration configuration = miniClusterConfiguration.getConfiguration(); final TaskExecutor taskExecutor = TaskManagerRunner.startTaskManager( configuration, new ResourceID(UUID.randomUUID().toString()), taskManagerRpcServiceFactory.createRpcService(), haServices, heartbeatServices, metricRegistry, blobCacheService, useLocalCommunication(), ExternalResourceInfoProvider.NO_EXTERNAL_RESOURCES, taskManagerTerminatingFatalErrorHandlerFactory.create( taskManagers.size())); taskExecutor.start(); taskManagers.add(taskExecutor); }}Copy the code

Continue.. Let’s go for a run. My back’s a little rough Orz