Flink

There are three positions in Flink that require end-to-end precision processing at a time:

  • Source end: When data enters Flink from the previous stage, it is necessary to ensure accurate consumption of information at a time.

  • Internal end of Flink: We already know that Checkpoint mechanism is used to save the status on disk. When a fault occurs, the status can be recovered to ensure internal consistency. If you don’t know, take a look at my previous post:

    The checkpoint mechanism, the cornerstone of Flink reliability, is analyzed in detail

  • Sink end: Ensure that data can be accurately sent to the next phase.

Prior to Flink version 1.4, precise one-time processing was limited to Flink applications, that is, all operators were completely saved and managed by Flink state to achieve precise one-time processing. However, after processing data, Flink mostly needs to send the results to external systems, such as Sink into Kafka. In this process, Flink does not guarantee accurate processing at the first time.

Formally in Flink version 1.4 introduces the function of a landmark: a two-phase commit Sink, namely TwoPhaseCommitSinkFunction function. The SinkFunction extracts and encapsulates the common logic in the two-phase commit protocol, and Flink implements precise once processing semantics with specific sources and sinks (such as Kafka 0.11). EOS, Exactly-Once Semantics).

End-to-end precise one-time processing semantics (EOS)

The following applies to Flink 1.4 and later

For Source: After all, the data falls into Flink, so Flink only needs to save the offset of the consumption data. For example, Kafka Consumer is used by Flink as the Source. The offset can be saved, and if a failure occurs later, the connector can reset the offset and re-consume the data to ensure consistency during recovery.

For Sink end: The Sink terminal is the most complex, because the data is landed on other systems, Flink cannot monitor the data once the data leaves Flink, so precise one-time processing semantics must also be applied to the external system where Flink writes data. Therefore, these external systems must provide a means to commit or roll back these writes, while ensuring that they can be used in coordination with Flink Checkpoint (Kafka 0.11 already implements precise once processing semantics).

Let’s take Flink and Kafka as an example. Flink reads data from Kafka and writes the data to Kafka.

The first reason for taking Kafka as an example is that most Flink systems currently read and write data with Kafka systems. The second and most important reason is that Kafka 0.11 officially released support for transactions, which is necessary for Flink applications that interact with Kafka to achieve end-to-end precision semantics.

Of course, Flink’s support for this kind of precise one-time processing semantics is not limited to Kafka. Any Source/Sink can be used as long as they provide the necessary coordination mechanism.

Flink and Kafka

As shown in the figure above, Flink contains the following components:

  1. A Source that reads data from Kafka

  2. A time windowed party operation

  3. A Sink that writes the results to Kafka.

For Sink to support precise once processing semantics (EOS), it must write data to Kafka as a transaction, so that when a transaction is committed all writes between two Checkpoint operations are committed as one transaction. This ensures that these writes can be rolled back in the event of a failure or crash.

Of course, in a distributed application with multiple concurrent execution sinks, it is not enough to perform a single commit or rollback, because all components must agree on these commit or rollback to ensure a consistent result. Flink uses a two-phase commit protocol and a pre-commit phase to solve this problem.

Two-phase Commit Protocol (2PC)

Two-phase Commit protocol (2PC) is A common approach to solve distributed transaction problems. It ensures that in distributed transactions, either all participating processes Commit or cancel the transaction, which implements A (atomicity) in ACID.

In a data consistency environment, it means that all backup data must change a certain value at the same time, or none of the backup data must change at the same time to achieve data consistency.

There are two important roles in the two-phase commit protocol, Coordinator and Participant. There is only one Coordinator who plays the role of coordinating and managing distributed transactions and there are multiple participants.

As the name suggests, two-phase Commit divides the Commit process into two consecutive phases: Voting and Commit.

The two-phase submission protocol process is shown in the figure below:

Stage 1: voting stage

  1. The coordinator sends a VOTE_REQUEST message to all participants.

  2. When the participant receives a VOTE_REQUEST message, it sends a VOTE_COMMIT message to the coordinator in response, telling the coordinator that it is ready to commit. If the participant is not ready or encounters other failures, it returns a VOTE_ABORT message telling the coordinator that the transaction cannot commit at this time.

Phase 2: Submission phase

  1. The coordinator collects voting information from the various participants. If all participants agree that the transaction can be committed, then the coordinator determines the final commit of the transaction, in which case the coordinator sends a GLOBAL_COMMIT message to all participants informing them to commit locally. If any of the participants returns a VOTE_ABORT message, the coordinator cancells the transaction, broadcasting a GLOBAL_ABORT message to all participants to tell them to cancel the transaction.

  2. Each participant that commits voting information waits for the coordinator to return a message. If the participant receives a GLOBAL_COMMIT message, the participant commits the local transaction; otherwise, if the GLOBAL_ABORT message is received, the participant cancels the local transaction.

Application of two-phase commit protocol in Flink

Flink’s two-stage submission idea:

We analyze Flink’s precise one-time processing from the start of the Flink program to the consumption of Kafka data, and finally to the time when Flink sinks data into Kafka.

  1. When Checkpoint is started, JobManager injects Checkpoint battier into the data stream, and Checkpoint barrier is passed between operators, as shown below:

  1. The Source end: Flink Kafka Source is responsible for saving Kafka consumption offsets. If Chckpoint succeeds, Flink is responsible for committing these writes. If Chckpoint succeeds, Flink is responsible for canceling them. It passes checkpoint barriers to the next Operator, and each Operator makes a snapshot of the current status and saves it to the State Backend.

    For the Source task, the current offset is saved as the state. The next time the Source task recovers from Checkpoint, it can resubmit the offset and re-consume the data from where it was saved last time, as shown in the following figure:

  1. Slink side: Starting from the Source side, each internal transform task encounters a checkpoint barrier and stores the status to its checkpoint. When the Data is processed to the Sink end, the Sink task will first write the Data into the external Kafka, and these Data belong to the pre-committed transaction (which cannot be consumed yet). In the pre-commit stage at this time, Data Sink must pre-commit its external transaction while saving the state to the state backend. As shown below:

  1. When the snapshots of all operator tasks are completed (all snapshots are considered as part of a Checkpoint), JobManager sends a notification to all tasks confirming that the Checkpoint is completed. At this point, the pre-commit phase is complete. The second phase of the two-phase commit protocol is the COMMIT phase. In this phase, JobManager initiates the Checkpoint completed callback logic for each Operator in the application.

    In this example, the Data Source and window operations have no external state, so at this stage, these two Opeartors do not need to execute any logic, but the Data Sink has an external state. At this point, we must submit the external transaction. When the Sink task receives the confirmation notice, it will formally submit the previous transaction. Unconfirmed data in Kafka is changed to “confirmed” and the data can actually be consumed, as shown below:

Note: JobManager coordinates each TaskManager to Checkpoint storage. Checkpoint storage is stored in StateBackend. By default, StateBackend is memory level. You can also change to file-level persistence.

Finally, a graph summarizes Flink’s EOS:

This diagram is recommended to save, summary comprehensive and concise, never lose the interviewer!