This article was first published at www.yuque.com/17sing

version The date of note
1.0 2022.2.2 The article first

The foreword 0.

Flink has been applied to production for some time. I was fortunate to check Checkpoint timeout caused by data skew when I started production. I had a brief understanding of the relevant mechanism at that time, and I happened to read Flink source code recently.

What Exactly is the difference between Exactly-Once and Exactly-Once?

  • Exactly Once: Data is not lost or repeated within a computing engine. In essence, Flink can be used to open checkpoints for Barrier alignment.
  • End to End Exactly Once: This means that data is not lost or repeated from the time it is read, processed by the engine, and written to external storage. This requires that the data source be replayable and that the writing side support recovery and rollback or idempotent transactions.

1. Why does data skew cause Checkpoint timeout

Checkpoint operators have a barrier alignment mechanism (why they must be aligned will be explained later). The following illustration illustrates the alignment process:

When two sides issue barrier, barrier1 reaches the operator before barrier2, and the operator will cache the input elements of one side until Barrier2 Checkpoint.

After each operator aligns a barrier, asynchronous state storage is performed and then a barrier is delivered. When each operator finishes Checkpoint, it notifies the CheckpointCoordinator. When a CheckpointCoordinator learns that the Checkpoint of all operators has been completed, the Checkpoint is completed.

In our application, a map operator receives a large amount of data, causing the barrier to remain unissued and the Checkpoint times out.

2. How does Checkpoint work

See Flink’s paper Lightweight Asynchronous Snapshots for Distributed Dataflow for details. Simply put, early fault-tolerant schemes for flow computing were periodic snapshots of global state, but this had two disadvantages:

  • Blocking calculation – the snapshot is blocked synchronously.
  • Snapshots are taken of both the unprocessed and processed records of the current operator, so the snapshot becomes extremely large.

Flink is extended based on the Chandy-Lamport algorithm, which performs snapshots asynchronously while requiring the data source to be replayable, but still stores upstream data. The scheme proposed by Flink does not store data in acyclic graphs.

In Flink, the Barrier flag is inserted periodically to tell downstream operators to start taking snapshots. This algorithm is based on the following premises:

  • Reliable network transmission, can do FIFO. I’m going to apply the operator hereblockedandunblockedOperation, if an operator isblockedIt caches all data received from the upstream channel and receives it directlyunblockedBefore the signal is sent.
  • Tasks can do the following on their channels:block.unblock.send messages.broading messages.
  • For the Source node, it is abstracted asNilInput channel.

3. Checkpoint implementation

In Flink, Checkpoint consists of the following steps:

  1. Feasibility check
  2. JobMaster tells Task to trigger a checkpoint
  3. TaskExecutor performs checkpoints
  4. JobMaster confirms the checkpoint

Next, let’s follow the source code to take a look inside the concrete implementation.

3.1 Feasibility Check

Reference code: CheckpointingCoordinator# startTriggeringCheckpoint.

  1. Ensure that the job is not in a closed or unstarted state (seeCheckpointPlanCalculator#calculateCheckpointPlan).
  2. Generate a new CheckpointingID and create a PendingCheckpoint — when all tasks have finished Checkpoint, it is converted to a CompletedCheckpoint. A thread is also registered to check whether a timeout occurs and Abort the current Checkpoint if it does (see CheckpointCheckpointPlanCalculator#createPendingCheckpoint).
  3. Trigger MasterHook. Some external systems require some extension logic before triggering checkpoints, which enables MasterHook to implement the notification mechanism (seeCheckpointPlanCalculator#snapshotMasterState).
  4. Repeat step 1 and notify SourceStreamTask to start triggering checkpoints if there is no problem (seeCheckpointPlanCalculator#triggerCheckpointRequest).

3.2 JobMaster notifies Task to trigger checkpoint

In CheckpointPlanCalculator#triggerCheckpointRequest, the execute #triggerCheckpoint method is called through the triggerTasks method. Execution corresponds to a Task instance, so the JobMaster can find its TaskManagerGateway through the Slot reference in it and send a remote request to Checkpoint.

3.3 TaskManager performs checkpoints

TaskManager is represented in the code as TaskExecutor. When JobMaster triggers a remote request to TaskExecutor, the Handle method is TaskExecutor#triggerCheckpoint, and then Task#triggerCheckpointBarrier is called:

  1. Make some checks, such as whether the Task is in the Running state
  2. Checkpoint: is invokedCheckpointableTask#triggerCheckpointAsync
  3. Checkpoint execution:CheckpointableTask#triggerCheckpointAsync. In order toStreamTaskAs an example, we consider how to trigger a downstream Checkpoint when the upstream has finished — by stuffingCheckpointBarrierTo trigger; Is called if the task is not finishedStreamTask#triggerCheckpointAsyncInMailbox. Will eventually walk intoSubtaskCheckpointCoordinator#checkpointStateTo trigger a Checkpoint.
  4. Operator save snapshot: callOperatorChain#broadcastEvent: Saves OperatorState and KeyedState.
  5. callSubtaskCheckpointCoordinatorImpl#finishAndReportAsync, : asynchronously reports that the current snapshot is complete.

3.4 JobMaster confirms checkpoint

|-- RpcCheckpointResponder
  \-- acknowledgeCheckpoint
|-- JobMaster
  \-- acknowledgeCheckpoint
|-- SchedulerBase
  \-- acknowledgeCheckpoint
|-- ExecutionGraphHandler
  \-- acknowledgeCheckpoint
|-- CheckpointCoordinator
  \-- receiveAcknowledgeMessage
Copy the code

In 3.1, we mentioned PendingCheckpoint. There are statuses maintained to ensure that all tasks and Master acks are complete. When the CheckpointCoordinator confirms that the Checkpoint is complete, the CheckpointCoordinator notifies all checkpoints that the Checkpoint is complete.

|-- CheckpointCoordinator
  \-- receiveAcknowledgeMessage
  \-- sendAcknowledgeMessages
Copy the code

3.5 Checkpoint Restoration

This part of the code is relatively simple, interested students can read the code according to the relevant call stack.

|-- Task \-- run \-- doRun |-- StreamTask \-- invoke \-- restoreInternal \-- restoreGates |-- OperatorChain \-- initializeStateAndOpenOperators |-- StreamOperator \-- initializeState |-- StreamOperatorStateHandler \-- initializeOperatorState |-- AbstractStreamOperator \-- initializeState |-- StreamOperatorStateHandler \-- InitializeOperatorState | - CheckpointedStreamOperator \ - initializeState # call user codeCopy the code

3.6 End to End Exactly Once

It’s actually quite difficult to implement end-to-end precision at once — consider the scenario of one Source versus N sinks. Therefore, Flink designs corresponding interfaces to ensure end-to-end accuracy, namely:

  • Precision TwoPhaseCommitSinkFunction: what do you want to be a Sink must implement this interface.
  • CheckpointedFunction: The hook when Checkpoint is called.
  • CheckpointListener: As the name implies, the implementer of this interface is notified when Checkpoint completes or fails.

At present Source and Sink all ExactlyOnce implementation only Kafka – its upstream support breakpoint reading, downstream support rollback or idempotent. Interested students can read about the implementation of this interface.

Some of you may wonder why JDBC Sink does not implement ExactlyOnce. The nature and execution of this interface are incompatible with JDBC transaction usage – when an operator means exit, previous transactions cannot be operated on. So the retryCommit TwoPhaseCommitSinkFunction and retryRollback is unable to see github.com/apache/flin…

4. Summary

In this paper, the principle and implementation of Checkpoint from the perspective of the problem, and the relevant source code to do a simple tracking. The line of code is fairly clear, but involves a large number of classes — a single responsibility principle, as attentive students may have noticed. TwoPhaseCommitSinkFunction is typical for realization of the template method design pattern.