Public account: Java big data and data warehouse, reply to “information”, get information, learn big data technology.

One, foreword

Stateful computation is an important feature for flow processing frameworks, because more complex flow processing scenarios require recording state and then updating state based on incoming data. The following scenarios require the state functionality of stream processing:

Duplicate data exists in the data flow. To delete duplicate data, you need to record the data that has been imported into the application. When new data is imported, deduplication is determined based on the data that has been imported. To check that the input stream conforms to a particular pattern, you need to cache the previously incoming elements as states. For example, determine whether the temperature in a temperature sensor data stream continues to rise. Aggregate analysis of data in a time window to analyze the 75 – or 99th – quartile value of an indicator in an hour. A state update and acquisition process is shown in the figure below. An operator subtask receives the input stream, obtains the corresponding state, and updates the state according to the new calculation results. A simple example is summation of an integer field in an input stream within a time window, so that when the operator subtask receives a new element, it retrieves the value already stored in the state, then adds the current input to the state and updates the state data.

Second, state type

Flink has two basic types of State: Managed State and Raw State.

Managed State is Managed by Flink, Flink helps store, restore, and optimize, while Raw State is Managed by the developer and needs to be serialized.

Specific differences are:

  • In terms of State management, Managed State is Managed by Flink Runtime, and the State is automatically stored and recovered. Flink has made some optimizations in storage management and persistence. When you scale horizontally, or modify the parallelism of a Flink application, state can also be automatically redistributed across multiple parallel instances. Raw State is a user – defined State.
  • In terms of State data structure, Managed State supports a series of common data structures, such as ValueState, ListState, MapState, etc. Raw State only supports bytes, and any upper-level data structures need to be serialized into byte arrays. To use it, the user needs to serialize it, storing it as a very low-level byte array, and Flink does not know what data structure it is storing.
  • In terms of specific usage scenarios, most operators can use Managed State by inheriting Rich function classes or other good interface classes. The Raw State is used when the existing operator and Managed State are insufficient.

The Managed State is further broken down into two types: Keyed State and Operator State.

To customize Flink operators, you can override the Rich Function interface class, such as RichFlatMapFunction. When using Keyed State, create and access the State by overriding the Rich Function interface class. For Operator State, the CheckpointedFunction interface needs to be further implemented.

2.1, Keyed to the State

Flink maintains a state instance for each key value and partitions all data with the same key into the same operator task that maintains and processes the corresponding state for that key. When a task processes a piece of data, it automatically limits state access to the key of the current data. Therefore, all data with the same key accesses the same state.

Note that the keying state can only be used on a KeyedStream via stream.keyby (…). To get KeyedStream.

Flink provides the following data formats to manage and store Keyed State:

  • ValueState: Stores the status of the single-value type. It can be updated using update(T) and retrieved by T value().
  • ListState: State of the storage list type. You can add elements with add(T) or addAll(List) and update(T). And get the entire list with get().
  • ReducingState: Used to store the results calculated by ReduceFunction and use Add (T) to add elements.
  • AggregatingState: Stores the results calculated by AggregatingState. Add (IN) is used to add elements.
  • FoldingState: has been identified as deprecated and will be removed in future releases. AggregatingState is recommended instead.
  • MapState: Maintains the Map state. Get obtains, PUT updates, contains contains, and remove removes elements.
public class ListStateDemo extends RichFlatMapFunction<Tuple2<String.Long>,List<Tuple2<String.Long>>>{

    private transient ListState<Tuple2<String, Long>> listState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        ListStateDescriptor<Tuple2<String, Long>> listStateDescriptor = new ListStateDescriptor(
                "listState",
                TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})
        );
        listState = getRuntimeContext().getListState(listStateDescriptor);
    }

    @Override
    public void flatMap(Tuple2<String, Long> value, Collector<List<Tuple2<String, Long>>> out) throws Exception {
        List<Tuple2<String, Long>> currentListState =  Lists.newArrayList(listState.get().iterator());
        currentListState.add(value);
        listState.update(currentListState);


        out.collect(currentListState);
    }

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        DataStream<Tuple2<String, Long>> dataStream = senv.fromElements(
                Tuple2.of("a".50L),Tuple2.of("a".60L),Tuple2.of("a".70L),
                Tuple2.of("b".50L),Tuple2.of("b".60L),Tuple2.of("b".70L),
                Tuple2.of("c".50L),Tuple2.of("c".60L),Tuple2.of("c".70L)); dataStream .keyBy(0)
                .flatMap(newListStateDemo()) .print(); senv.execute(ListStateDemo.class.getSimpleName()); }}Copy the code

2.2, the Operator of the State

Operator State can be applied to all operators. Each Operator subtask or instance of an Operator shares a State that can be accessed and updated by data flowing into the Operator subtask.

Operator state cannot be accessed by another instance of the same or different operator.

Flink provides three basic data structures for operator states:

  • ListState: State of the storage list type.
  • UnionListState: The state that stores the list type. The difference between ListState and UnionListState is that if the degree of parallelism changes, ListState will summarize all the concurrent state instances of the operator and divide them equally among new tasks. UnionListState simply aggregates all concurrent state instances, and the specific partitioning behavior is defined by the user.
  • BroadcastState: state of the operator used for broadcasting. If an operator has multiple tasks and each of its task states is the same, then this special case is best suited for applying broadcast state.

Assuming that there is no need to distinguish the types of monitoring data at this time, as long as the monitoring data exceeds the threshold and reaches the specified number of times, the alarm will be generated:

public class OperateStateDemo extends RichFlatMapFunction<Tuple2<String.Long>, List<Tuple2<String.Long>>>
        implements CheckpointedFunction{

    private final int threshold;
    private transient ListState<Tuple2<String, Long>> checkpointedState;
    private List<Tuple2<String, Long>> bufferedElements;

    public OperateStateDemo(int threshold) {
        this.threshold = threshold;
        this.bufferedElements = new ArrayList<>();
    }

    @Override
    public void flatMap(Tuple2<String, Long> value, Collector<List<Tuple2<String, Long>>> out) throws Exception {
        bufferedElements.add(value);
        if(bufferedElements.size() == threshold) { out.collect(bufferedElements); bufferedElements.clear(); }}/** * Perform checkpoint snapshot *@param context
     * @throws Exception
     */
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        checkpointedState.clear();
        for(Tuple2<String, Long> element : bufferedElements) { checkpointedState.add(element); }}@Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<Tuple2<String, Long>> listStateDescriptor = new ListStateDescriptor(
                "listState",
                TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {})
        );
        checkpointedState = context.getOperatorStateStore().getListState(listStateDescriptor);
        // For fault recovery
        if(context.isRestored()) {
            for(Tuple2<String, Long> element : checkpointedState.get()) { bufferedElements.add(element); } checkpointedState.clear(); }}public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        senv.getCheckpointConfig().setCheckpointInterval(500);
        DataStream<Tuple2<String, Long>> dataStream = senv.fromElements(
                Tuple2.of("a".50L),Tuple2.of("a".60L),Tuple2.of("a".70L),
                Tuple2.of("b".50L),Tuple2.of("b".60L),Tuple2.of("b".70L),
                Tuple2.of("c".50L),Tuple2.of("c".60L),Tuple2.of("c".70L)); dataStream .flatMap(new OperateStateDemo(2)) .print(); senv.execute(OperateStateDemo.class.getSimpleName()); }}Copy the code

Third, horizontal expansion of state

State of horizontal scalability problem is mainly refers to modify Flink application parallelism, precisely, each operator parallel instance number or operator subtasks number changed, applications need to shut down or start some operator subtasks, one in the original state data needs to be smooth on one of the operator subtasks update to the new operator subtasks.

Flink Checkpoint is a very good mechanism for migrating state data between operators. The local status of the operator generates snapshots for data and saves them to distributed storage such as HDFS. After scaling, the number of operator subtasks changes, the subtasks are restarted, and the corresponding state is restored from the distributed storage.

Keyed State and Operator State have different scaling mechanisms. Since each Keyed State is always corresponding to a Key, when scaling horizontally, the Key will always be automatically assigned to a certain operator subtask, so the Keyed State will automatically migrate among multiple parallel subtasks. For a non-keyedstream, the data flowing into the operator subtask may change as the degree of parallelism changes. As shown in the above, if an application the parallelism of the original to 2, then the data will be divided into two parallel flows into two operator subtasks, each operator subtasks have a their own state, when the parallelism instead of 3, data stream is split into three, or parallelism to 1, data stream into 1, the state storage also changed accordingly. For the horizontal scaling problem, Operator State can be allocated in two ways: uniformly or by combining all states and redistributing them to each instance.

4. Checkpoint mechanism

In order to provide fault tolerance in Flink, Flink provides a checkpoint mechanic (that I found). Through checkpoint mechanism, Flink periodically generates checkpoint barrier on data stream. When an operator receives a barrier, it will generate a snapshot based on the current state and then pass the barrier to the downstream operator. After receiving the barrier, the downstream operator will A snapshot is also generated based on the current state, which is successively passed to the last Sink operator. When an exception occurs, Flink can restore all operators to their previous state based on the latest snapshot data.

4.1 Starting a Checkpoint

By default, checkpoint is disabled. By calling the StreamExecutionEnvironment enableCheckpointing (n) to enable checkpoint, the inside of the n is the interval of checkpoint, unit of milliseconds.

Checkpoint is the core function of Flink to implement the fault tolerance mechanism. It can periodically generate snapshots based on the states of each Operator in the Stream according to the configuration, and store these state data regularly and persistently. When Flink program crashes unexpectedly, You can selectively recover from these Snapshots when rerunking the program to correct any interruption in the state of the program data caused by a failure. Here’s a quick look at the Flink Checkpoint mechanism, as shown below:

When Checkpoint is specified, a Barrier flag will be inserted into multiple distributed Stream sources when Flink is running. These barriers flow together to downstream operators based on data records in the Stream. When an Operator receives a Barrier, it suspends processing of new data records received by Steam. Because an Operator can have multiple input streams, and each Stream will have a Barrier, the Operator will wait until all the barriers in the input streams have arrived. When all the barriers in the Stream have reached this Operator, then all the barriers appear to be at the same point in time (indicating that they are aligned). In the process of waiting for all the barriers to arrive, The Operator’s Buffer may already cache some data Records that reached the Barrier Operator before the Barrier. Then the Operator will Emit the data Records before the Barrier. As input from the downstream Operator, the Barrier corresponding Snapshot will Emit Emit data as the Checkpoint result.

Other attributes of Checkpoint include:

  • Compare at least once with exactly once: You may choose to use either of the two guarantee levels by passing a pattern to the enableCheckpointing(long interval, CheckpointingMode mode) method. For most applications, being precise once is a good choice. At least one may be more relevant for some applications with very low latency (always only a few milliseconds).
  • Checkpoint Timeout: If the checkpoint timeout exceeds the specified threshold, the checkpoint operation in progress will be discarded.
  • Checkpoints: This parameter defines the time that a streaming application takes to checkpoint change. If the value is set to 5000, no matter how long the checkpoint duration or interval is, the next checkpoint will start at least five seconds after the previous checkpoint has completed.
  • Number of concurrent checkpoints: By default, the system does not trigger another checkpoint if the previous checkpoint has not completed (failed or succeeded). This ensures that the topology does not spend too much time at checkpoint, affecting normal processing. It is feasible to allow multiple checkpoints to run in parallel. It makes sense to run frequent checkpoints to minimize retry capacity when there is fixed processing delay (e.g., consuming external services).
  • Externalized checkpoints: You can configure periodic checkpoint storage to an external system. Externalized checkpoints write metadata to persistent storage that is not automatically deleted when a job fails. In this way, if your job fails, you have an existing checkpoint to recover from. See the Externalized Checkpoints deployment documentation for more details.
  • Fail a task at checkpoint or continue the task: It determines whether to fail the task at checkpoint when an error occurs. Failure is the default behavior. Or if it is disabled, the task simply reports the checkpoint error to the checkpoint coordinator and continues to run.
  • Prefer checkpoint for recovery: This property determines whether the job is backed up at the latest checkpoint, even if a closer Savepoint is available, potentially reducing the recovery time (checkpoint recovery is faster than SavePoint recovery).
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); By pointing(1000); by pointing(1000); / / advanced options: / / set the pattern for the precise time (this is the default value) env. GetCheckpointConfig () setCheckpointingMode (CheckpointingMode. EXACTLY_ONCE); / / to confirm the time between checkpoints to 500 ms env. GetCheckpointConfig () setMinPauseBetweenCheckpoints (500); Env.getcheckpointconfig ().setcheckpointTimeout (60000); / / the same time only allow a checkpoint for env. GetCheckpointConfig () setMaxConcurrentCheckpoints (1); // Enable Externalized Checkpoints that remain after job aborts env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); / / to allow in more recent savepoint back to checkpoint env. GetCheckpointConfig () setPreferCheckpointForRecovery (true);Copy the code

Save multiple checkpoints

By default, if the Checkpoint option is set, Flink keeps only the last Checkpoint, and when Flink fails, Flink can recover from the latest Checkpoint. However, if we want to keep multiple checkpoints, and can select one of them to recover based on actual needs, this will be more flexible. For example, if we find that the data processing of the last four hours has a problem, we want to restore the entire status to the previous four hours. Flink supports multiple checkpoints. Add the following configuration to the conf/flink-conf.yaml configuration file of Flink to specify the maximum number of checkpoints to be saved:

state.checkpoints.num-retained: 20
Copy the code

The last 20 Checkpoint points are retained. To advance to a Checkpoint, specify a Checkpoint path.

Perform recovery from Checkpoint

Start from the designated checkpoint, a recent/flink/checkpoints/workFlowCheckpoint / 339439 e2a3d89ead4d71ae3816615281 / CHK – 1740584 launch, It is usually necessary to stop the currently running flink-session and start it with the following command:

. /bin/flink run -p 10 -s /flink/checkpoints/workFlowCheckpoint/339439e2a3d89ead4d71ae3816615281/chk-1740584/_metadata -c Com. Code2144. Helper_wink - 1.0 - the SNAPSHOT. The jarCopy the code

You can put commands in scripts and execute them directly each timecheckpointRestore the script:

4.2 Savepoints

Savepoints is a special implementation of the Checkpoint mechanism, which allows to manually trigger Checkpoint and store the result to a specified path persistently. It is mainly used to avoid the state loss caused by restart or upgrade of the Flink cluster. The following is an example:

Trigger the Savepoint of the job with the specified ID and store the result in the specified directory
bin/flink savepoint :jobId [:targetDirectory]
Copy the code

Manual savepoint

/app/local0409251 / flink - 1.6.2 / bin/flink savepoint eaff826ef2dd775b6a2d5e219 HDFS: / / bigdata/pathCopy the code

Successfully triggering savePoint usually prompts:Savepoint completed. Path: hdfs://path...:

Manually Canceling a Task

Instead of checkpoint stopping or manually killing, savePoint usually stops the task manually and updates the code using flink cancel… Command:

/app/local/ flink - 1.6.2 / bin/flink eaff826ef2dd775b6a2d5e219 cancel 0409251Copy the code

Starts the job from the specified Savepoint

bin/flink run -p 8 -s hdfs:///flink/savepoints/savepoint-567452-9e3587e55980 -c Com. Code2144. Helper_workflow. HelperWorkFlowStreaming jars/BSS - ONSS - Flink - 1.0 - the SNAPSHOT. The jarCopy the code

5. State backend

Flink provides a variety of state backends that specify how and where the state is stored.

State can be in Java’s heap or off-heap memory. Depending on State Backend, Flink can also manage the state of the application itself. To allow applications to maintain very large states, Flink can manage memory itself (overwrite to disk if necessary). By default, all Flink jobs use state Backend specified in the Flink -conf.yaml configuration file.

However, the default state backend specified in the configuration file is overwritten by the state backend specified in the Job.

5.1 Classification of state manager

MemoryStateBackend

The default approach, which is based on the JVM’s heap memory, is mainly suitable for local development and debugging.

FsStateBackend

Storage is based on file systems, which can be local file systems or distributed file systems such as HDFS. Note That although FsStateBackend is used, data in progress is still stored in TaskManager’s memory. A status snapshot is written to a specified file system only at checkpoint.

RocksDBStateBackend

RocksDBStateBackend is a third-party state manager built into Flink. It uses the embedded key-value database RocksDB to store ongoing data. At checkpoint, data is persisted to a specified file system. Therefore, when RocksDBStateBackend is used, you need to configure a file system for persistent storage. The reason for this is that RocksDB is less secure as an embedded database, but has a faster read rate than the full file system approach. Compared with the full memory method, its storage space is larger, so it is a more balanced solution.

5.2. Configuration Mode

Flink supports configuring backend managers in two ways:

The first method is code based and only applies to the current job:

/ / configuration FsStateBackend env. SetStateBackend (new FsStateBackend (" HDFS: / / the namenode: 40010 / flink/checkpoints ")); / / configuration RocksDBStateBackend env. SetStateBackend (new RocksDBStateBackend (" HDFS: / / the namenode: 40010 / flink/checkpoints "));Copy the code

When configuring RocksDBStateBackend, you need to import the following additional dependencies:

< the dependency > < groupId > org. Apache. Flink < / groupId > < artifactId > flink - statebackend - rocksdb_2. 11 < / artifactId > The < version > 1.9.0 < / version > < / dependency >Copy the code

The second method is based on the flink-conf.yaml configuration file. The configuration takes effect for all jobs deployed on the cluster:

state.backend: filesystem
state.checkpoints.dir: hdfs://namenode:40010/flink/checkpoints
Copy the code

Vi. State consistency

6.1 end to end

In a real application, a flow processing application would include a data source (such as Kafka) and output to a persistence system in addition to the flow processor.

End-to-end consistency guarantee means that the correctness of the results runs through the whole flow processing application. Each component guarantees its own consistency, and the overall level of end-to-end consistency depends on the least consistent component of all. It can be divided as follows:

  • Internal assurance: Rely on checkpoint
  • Source side: Read location of data that requires an external source to reset
  • Sink: Ensure that data is not repeatedly written to the external system during fault recovery.

As for sink end, there are two specific implementation methods:

  • Idempotent writes: an Idempotent operation is an action that can be repeated many times, but results in only one change, i.e., it does not work if it is repeated later.
  • Transactional writing: Transactions are built to checkpoint to write to the external system. When checkpoint is completed, all results are written to the sink system.

For transactional writes, there are two implementations: write-ahead logging (WAL) and two-phase commit (2PC). Flink DataStream API provides GenericWriteAheadSink template class and TwoPhaseCommitSinkFunction interface, can easily achieve this transactional writes in one of two ways.

6.2 Flink+Kafka implements end-to-end exactly-once semantics

The implementation of end-to-end state consistency requires that each component be implemented. For Flink + Kafka data pipeline system (Kafka in, Kafka out), how can each component ensure exactly-once semantics?

  • Internal: The checkpoint mechanism is used to save status to a disk. When a fault occurs, the data can be recovered to ensure internal status consistency
  • Source: Kafka consumer serves as the source to save the offset. If a failure occurs in a subsequent task, the connector can reset the offset and re-consume data to ensure consistency during recovery
  • Sink: kafka producer as a sink, use two-phase commit sink, the inside of the need to implement a TwoPhaseCommitSinkFunction checkpoint mechanism.

The EXACTLY_ONCE semantics (EOS for short) means that each input message will only affect the final result once, note that it will only affect the final result once, not once. Flink has always claimed that it supports EOS, but in fact it is mainly for the internal Flink application, and for external systems (end to end) there are strong restrictions

  • External system writes support idempotency
  • External systems support writes in a transactional manner

Kafka only guaranteed at-least-once or at-most Once semantics until version 0.11, and began to guarantee EXACTLY_ONCE semantics with the introduction of idempotent send and transactions.

Maven rely on Start the supported version Production/consumption class name Kafka version Pay attention to
Flink connector – kafka – 0.8 _2. 11 1.0.0 FlinkKafkaConsumer08

FlinkKafkaProducer08
X 0.8. Offsets are submitted to ZK using Kafka’s internal SimpleConsumer api. Flink
Flink connector – kafka – 0.9 _2. 11 1.0.0 FlinkKafkaConsumer09

FlinkKafkaProducer09
X 0.9. Use the new Kafka Consumer API.
Flink connector – kafka – 0.10 _2. 11 1.2.0 FlinkKafkaConsumer010

FlinkKafkaProducer010
X 0.10. Support for Kafka production/consumption message band timestamp
Flink connector – kafka – 0.11 _2. 11 1.4.0 FlinkKafkaConsumer011

FlinkKafkaProducer011
X 0.11. Since 0.11.x Kafka does not support Scala 2.10. This connector supports Kafka transaction messaging to provide exactly once semantics to producers.
Flink - connector - kafka_2. 11 1.7.0 FlinkKafkaConsumer

FlinkKafkaProducer
> = 1.0.0 High versions are backward compatible. However, for Kafka versions 0.11.x and 0.10.x, we recommend using dedicated versions, respectivelyFlink connector - Kafka - 0.11 _2. 11andLink - the connector - Kafka - 0.10 _2. 11

Flink TwoPhaseCommitSinkFunction interface, was introduced to the version 1.4.0 encapsulates the two-phase commit logic, and Kafka Sink implements TwoPhaseCommitSinkFunction connector, Dependent Kafka version is 0.11+

public class FlinkKafkaDemo {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment senv = StreamExecutionEnvironment.getExecutionEnvironment();
        senv.enableCheckpointing(1000);
        senv.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);

        // Kafka data source
        Map<String, String> config = Configuration.initConfig("commons.xml");
        Properties kafkaProps = new Properties();
        kafkaProps.setProperty("bootstrap.servers", config.get("kafka-ipport"));
        kafkaProps.setProperty("group.id", config.get("kafka-groupid"));

        SingleOutputStreamOperator<String> dataStream = senv.addSource(
                new FlinkKafkaConsumer011(
                        config.get("kafka-topic"),
                        new SimpleStringSchema(),
                        kafkaProps
                ));

        / / sink to kafka
        FlinkKafkaProducer011<String> producer011 = new FlinkKafkaProducer011<>(
                config.get("kafka-ipport"),
                "test-kafka-producer".new SimpleStringSchema());

        producer011.setWriteTimestampToKafka(true);
        dataStream.map(x -> {
            // Throw an exception
            if("name4".equals(JSON.parseObject(x).get("name"))){
                System.out.println("name4 exception test...");
                // throw new RuntimeException("name4 exception test..." );
            }

            returnx; }).addSink(producer011); senv.execute(FlinkKafkaDemo.class.getSimpleName()); }}Copy the code

JobManager coordinates taskManagers to checkpoint storage for Flink. Checkpoint storage is stored in StateBackend. StateBackend is memory-level by default and can be changed to file-level persistence.

When checkpoint starts, JobManager injects the barrier into the data stream. Barriers are passed between operators.

Each operator takes a snapshot of the current state and saves it to the state back end. For the Source task, the current offset is saved as the state. The next time the source task recovers from checkpoint, it can recommit the offset and re-consume the data from where it was last saved.

Every internal transform task that encounters a Barrier stores the state to its checkpoint.

The Sink task first writes data to the external Kafka, which is a pre-committed transaction (not yet consumed). When a Barrier is encountered, the state is saved to the state back end and a new pre-committed transaction is opened.

When the checkpoint is complete, JobManager sends a notification to all tasks to confirm that the checkpoint is complete. When the Sink task receives an acknowledgement, it will formally commit the previous transaction. The unacknowledged data in Kafka is changed to “confirmed” and the data can actually be consumed.

Therefore, it can be seen that the execution process is actually a two-stage submission. After the execution of each operator is completed, “pre-submission” will be carried out until the sink operation is completed, and “confirmation submission” will be initiated. If the execution fails, the withholding trade will be abandoned.

The specific two-phase commit steps are summarized as follows:

  • After the first data arrives, a kafka transaction is started, and the kafka partition log is normally written to but marked as uncommitted. This is called “pre-committed”, and the JobManager checkpoint operation is triggered. The barrier passes down from the source, and the operator encountering the barrier stores the state to the state back end and notifies the JobManager
  • The Sink connector receives the barrier, saves the current state, stores it to checkpoint, notifies jobManager, and starts the transaction of the next stage for submitting the data of the next checkpoint
  • The JobManager receives notification of all tasks and sends a confirmation message indicating that checkpoint completion is complete
  • The Sink task receives the confirmation message from jobManager and formally submits the data during this period
  • The external Kafka closes the transaction and the submitted data can be consumed normally.

Therefore, if StateBackend is used to recover from an outage, only the confirmed submitted operations can be recovered.

6.3. Idempotence and transactions in Kafka

As you can see from the table above, Kafka was only able to guarantee at-least-once or at-most Once semantics prior to version 0.11. Since version 0.11, the EXACTLY_ONCE semantics have been guaranteed with the introduction of idempotent send and transactions.

idempotence

Without the introduction of idempotent, Kafka sends messages normally and retries as follows:

To implement the idempotent semantics of Producer, Kafka introduces the Producer ID (PID) and Sequence Number. Each new Producer is assigned a unique PID when initialized, which is completely transparent to the user and not exposed to the user.

The Producer sends each <PID, Topic, Partition> message and increases the Sequence Number monotonously from 0. The broker maintains a Sequence Number for each <PID, Topic, Partition> message and increases the Sequence Number by one each time the message is committed. For each received message, the Broker accepts it if its sequence number is more than 1 greater than the number the Broker maintained (that is, the number of the message that was last committed), otherwise it is discarded:

  • If the ordinal number is more than 1 larger than that maintained by the Broker, it is out of order.
  • The ordinal number is smaller than that maintained by the Broker, indicating that the message is being saved as duplicate data.

With idempotent, Kafka sends messages normally and retries as follows:

The transaction

A transaction is when all operations, as an atom, either succeed or fail without the possibility of partial success or partial failure. For example, if Xiao Ming transfers 1000 yuan to Xiao Wang, first, The account of Xiao Ming will be reduced by 1000, and then the account of Xiao Wang will be increased by 1000. These two operations must be regarded as one transaction, otherwise, there will be a problem of only decreasing and not increasing, so either the two operations will fail, indicating that the transfer failed. Or they both succeed, which means the transfer is successful. In order to ensure transaction, two-phase commit protocol is generally adopted in distributed mode.

To address the problem of not being EXACTLY ONCE across sessions and all partitions, Kafka introduced transactions starting from 0.11.

To support transactions, Kafka uses Transacation coordinators to coordinate entire transactions and persist transactions to internal topics, similar to offsets and groups.

The user provides a global Transacation ID for the application. The Transacation ID does not change after the application is restarted. To ensure that the old Producer with the same Transaction ID becomes invalid after the new Producer starts, each time the Producer obtains the PID through the Transaction ID, it also obtains a monotonically increasing epoch. Since the epoch of an old Producer is smaller than the epoch of a new Producer, Kafka can easily identify the original Producer as the old Producer and reject its request. With Transaction ID, Kafka guarantees that:

  • Data is sent idempotent across sessions. When a new Producer instance with the same Transaction ID is created and working, the old Producer stops working.
  • Transaction recovery across sessions. If an application instance goes down, the new instance can ensure that any unfinished old transactions are either committed or Abort, causing the new instance to start working from a normal state.

KIP-98 对KafkaThe transaction principle is introduced in detail, and the complete flow chart is as follows:

  • The Producer sends a FindCoordinatorRequest request to any brokers to obtain the address of the Transaction Coordinator.
  • After a Transaction Coordinator is found, the Producer with idempotent properties must initiate an InitPidRequest to obtain the PID.
  • If the beginTransaction() method is called to start a Transaction, the Producer locally records that the Transaction has been started, but a Transaction Coordinator does not consider the Transaction started until the Producer sends the first message.
  • The stage of Consume-Transform-Produce contains the data processing process of the entire transaction and contains multiple requests.
  • Once the data write operation is complete, the application must call either KafkaProducer’s commitTransaction method or abortTransaction method to terminate the current transaction.

6.4 Two-phase Commit Protocol

Two-phase commit refers to a protocol that is often used to implement distributed transactions. It can be simply defined as pre-commit and actual commit. There are two roles: Coordinator(C) and several transaction Participant(P).

  • C writes the prepare request to the local log and sends a prepare request to P
  • After receiving the prepare request, P starts to execute the transaction. If the transaction succeeds, P returns a Yes or OK status to C. Otherwise, P returns No and saves the status to the local log.
  • C receives the status returned by P. If the status of each P is Yes, the transaction Commit operation is performed and a Commit request is sent to each P. After receiving the Commit request, each P performs a Commit transaction. If at least one P is in the No state, Abort is executed. An Abort request is sent to each P. Upon receiving the Abort request, each P executes the Abort transaction.

note: C or P Writes sent or received messages to logs first for recovery after faults, similar to WAL

7. Link documents

Apache Flink v1.10 官方 文档 更 新 : Flink State check State check State check State check State check State check State check State No matter how busy you need to look at Flink state management

Public account: Java big data and data warehouse, reply to “information”, get information, learn big data technology.