Flink state and fault tolerance are the core knowledge points of this framework. In this article, I examine the ability of Flink to change the status of a system that I’ve discovered in the hardware system. Changing the parameter Settings of Flink Checkpoints

What is the State State?

  • When Flink is used for window aggregation statistics, sorting and other operations, the processing of data flow cannot do without state management
  • Is the running status/history of an Operator, maintained in memory
  • Flow: A subtask of an operator receives the input stream, retrieves the corresponding state, computes the new result, and updates the result into the state

Stateful and stateless

  • Stateless calculation: The same data into the operator how many times, the same output, such as filter

  • Stateful computing: Historical states need to be considered. The same input will have different outputs, such as sum and reduce aggregation operations

  • Status management classification

    • ManagedState (often used)
      • Flink management, automatic storage recovery
      • Two types of segment
        • Keyed State Keyed State
          • This is only used with KeyBy, only in KeyStream, each key has state, which is based on the state on the KeyedStream
          • This is usually done with richFlatFunction, or any other RichFunction, in the open() declaration cycle
          • ValueState, ListState, MapState and other data structures
        • Operator State Indicates the State of the Operator.
          • ListState, UnionListState, BroadcastState data structures
    • RawState (used less)
      • Managed and maintained by users themselves
      • Storage structure: binary array
  • State data structures (State values may exist in memory, disk, DB, or other distributed storage)

    • ValueState simply stores a value (ThreadLocal/String)
      • ValueState.value()
      • ValueState.update(T value)
    • ListState list
      • ListState.add(T value)
      • Liststate.get () // Get an Iterator
    • MapState Indicates the mapping type
      • MapState.get(key)
      • MapState.put(key, value)

Back end: Where is the storage

  • Flink comes with the following out-of-the-box State Backends:

    • HashMapStateBackend, EmbeddedRocksDBStateBackend (new version)

      • If there is no other configuration, the system uses HashMapStateBackend.
    • (Old version) MemoryStateBackend, FsStateBackend, and RocksDBStateBackend

      • If MemoryStateBackend is not set, MemoryStateBackend is used by default.

  • State,

    • HashMapStateBackend stores data internally as objects in the Java heap.

      • Key/value state and window operators hold hash tables for storing values, triggers, and so on
      • Very fast, because every state access and update operates on an object on the Java heap
      • However, the size of the state is limited by the memory available in the cluster
      • Scene:
        • Jobs with large state, long window, and large key/value states.
        • All high availability Settings.
    • EmbeddedRocksDBStateBackend in RocksDB save state data in a database

      • This database is (by default) stored in the TaskManager local data directory
      • Unlike HashMapStateBackend, which stores objects in Java, data is stored as serialized byte arrays
      • RocksDB can scale based on available disk space and is the only state back end that supports incremental snapshots.
      • But every state access and update needs to be (de) serialized and possibly read from disk, resulting in an order of magnitude slower average performance than the memory state back end
      • scenario
        • Jobs with very large states, long Windows, and large key/value states.
        • All high availability Settings
    • The old version

    MemoryStateBackend (memory, not recommended in production scenarios) FsStateBackend (for file systems, local file systems, HDFS, better performance, commonly used) RocksDBStateBackend (Do not worry about OOM risks, Is the choice of most of the time) code configuration: StreamExecutionEnvironment env = StreamExecutionEnvironment. GetExecutionEnvironment (); env.setStateBackend(new EmbeddedRocksDBStateBackend());
    env.getCheckpointConfig().setCheckpointStorage("file:///checkpoint-dir");
    / / or
    env.getCheckpointConfig().setCheckpointStorage(new FileSystemCheckpointStorage("file:///checkpoint-dir"));
    
    Copy the code

What is a Checkpoint

  • Global snapshot of the current State of all operators in Flink

  • By default, checkpoint is disabled

  • Checkpoint stores State data periodically and persistently to prevent data loss

  • Manually invoke checkpoint (savepoint) for flink cluster maintenance and upgrade

  • In the bottom layer, chandy-Lamport distributed snapshot algorithm is used to ensure the consistency of data in distributed environment

  • A consistent checkpoint for stateful flow applications is essentially a copy (a snapshot) of the state of all tasks at a point in time; This point in time should be when all tasks have just finished processing the same input data

Some of the checkpoint storage types bundled with Flink:

  • The job manager checkpoint store JobManagerCheckpointStorage
  • Checkpoint FileSystemCheckpointStorage storage file system

End-to-end state consistency

Data consistency guarantee is implemented by the stream processor, that is to say, it is ensured inside the Flink stream processor. In real applications, besides the stream processor, it also includes the end-to-end consistency guarantee of the data source (such as Kafka and Mysql) and the output to the persistent system (Kafka, Mysql, Hbase and CK). Is means that the correctness of the result runs through every link of the whole flow processing application, and each component must ensure its own consistency.Copy the code
  • Source
    • An external data source is required to reset the read position, resetting the offset to the position before the failure in the event of a failure
  • internal
    • Using a mechanism that provides the ability to recover data during a failure that I have discovered
  • Sink:
    • When a failure is recovered, data is not repeatedly written to the external system, which is common with idempotent and transactional writes (in conjunction with checkpoint).

This section describes how to configure common checkpoint parameters

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// Set the checkpoint interval to 1000 ms
env.getCheckpointConfig().setCheckpointInterval(1000);
// Set the status level mode to exactly-once
env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// The timeout may be caused by saving too much time or a problem on the state backend. Synchronous task execution cannot be blocked all the time
env.getCheckpointConfig().setCheckpointTimeout(60000L);  
// Set whether to retain Checkpoint data in case of cancellation or failure. This setting is important. Incorrect setting may cause Checkpoint data failure
// There are two parameters you can set
/ / ExternalizedCheckpointCleanup. RETAIN_ON_CANCELLATION: cancel homework to checkpoint. Checkpoint state must be manually cleared after cancellation.
/ / ExternalizedCheckpointCleanup. DELETE_ON_CANCELLATION: cancel the delete checkpoint when working. Checkpoint status is available only if the job fails.env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELL ATION);Copy the code

Actual combat part:

In order to simulate the order data generated in real time in the production environment, we define a data source to continuously generate the simulated order data

Order type:

@Data
@AllArgsConstructor
@NoArgsConstructor
public class VideoOrder {
    private String tradeNo;
    private String title;
    private int money;
    private int userId;
    private Date createTime;

    @Override
    public String toString(a) {
        return "VideoOrder{" +
                "tradeNo='" + tradeNo + '\' ' +
                ", title='" + title + '\' ' +
                ", money=" + money +
                ", userId=" + userId +
                ", createTime=" + createTime +
                '} '; }}Copy the code
public class VideoOrderSourceV2 extends RichParallelSourceFunction<VideoOrder> { private volatile Boolean flag = true; private Random random = new Random(); private static List<VideoOrder> list = new ArrayList<>(); The static {list. Add (new VideoOrder (" ", "Java", 10, 0, null)); List. The add (new VideoOrder (" ", "spring boot", 15, 0, null)); * @param parameters * @throws Exception */ @override public void open(Configuration) parameters) throws Exception { System.out.println("-----open-----"); @throws Exception */ @override public void close() throws Exception { System.out.println("-----close-----"); } /** * Data generating logic * @param CTX * @throws Exception */ @Override public void run(SourceContext<VideoOrder> CTX) throws Exception { while (flag){ Thread.sleep(1000); String id = UUID.randomUUID().toString().substring(30); int userId = random.nextInt(10); int videoNum = random.nextInt(list.size()); VideoOrder videoOrder = list.get(videoNum); videoOrder.setUserId(userId); videoOrder.setCreateTime(new Date()); videoOrder.setTradeNo(id); System.out.println(" generate :"+videoOrder.getTitle()+", "+ VideoOrder.getMoney ()+", "+ timeUtil.format (VideoOrder.getCreateTime ())); ctx.collect(videoOrder); }} /** * Override public void cancel() {flag = false; }}Copy the code

The format of the generated data is as follows:

Main program: use reduce operator to roll the price of the data into the order, and set Checkpoint to ensure that the data can be accessed


public class FlinkKeyByReduceApp {

    /**
     * source
     * transformation
     * sink
     *
     * @param args
     */
    public static void main(String[] args) throws Exception {

        // Build the entry for executing the task environment and starting the task, storing global parameters
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        env.enableCheckpointing(5000); env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELL ATION);// This is my local IP address
        env.getCheckpointConfig().setCheckpointStorage(new                                           FileSystemCheckpointStorage("HDFS: / / 192.168.192.100:8020 / checkpoint." "));

       DataStreamSource<VideoOrder> ds = env.addSource(new VideoOrderSourceV2());
        KeyedStream<VideoOrder, String> videoOrderStringKeyedStream = ds.keyBy(new KeySelector<VideoOrder, String>() {
            @Override
            public String getKey(VideoOrder value) throws Exception {
                returnvalue.getTitle(); }}); SingleOutputStreamOperator<VideoOrder> reduce = videoOrderStringKeyedStream.reduce(new ReduceFunction<VideoOrder>() {
            @Override
            public VideoOrder reduce(VideoOrder value1, VideoOrder value2) throws Exception {
                VideoOrder videoOrder = new VideoOrder();
                videoOrder.setTitle(value1.getTitle());
                videoOrder.setMoney(value1.getMoney() + value2.getMoney());
                returnvideoOrder; }}); reduce.print(); env.execute("job"); }}Copy the code

In the local test run results, you can see that the data is continuously scrolling according to the order grouping

Access the HDFS of the server to check whether the checkpoint data exists

After that, package the application and upload it to the server for testing. You can use Flink’s Web page to manually submit the JAR package for running, or use commands to submit, and then you can see relevant log output in the process of running the program

./bin/flink run -c net.xxx.xxx.FlinkKeyByReduceApp -p 3 /xiaochan-flink.jar 
Copy the code

Simulation of downtime

When running the program, we can see the ID number of the task in Flink. At this time, we manually cancel or directly kill the service, and the task is forced to suspend.

Entering HDFS, we can see that the data of the checkpoint we set still exists. We use the following command to continue the calculation from the order calculation state before the last outage.

-s : Specify the location of metadata for checkpoints, This position recording procedures before downtime calculation condition. / bin/flink run - s/checkpoint/id/CHK - 23 / _metadata - c net. XXX. XXX. FlinkKeyByReduceApp - 3 p /root/xdclass-flink.jarCopy the code

Run the following command to go to the WEB page and check whether the command is displayed successfully.

It can be seen that when a close occurs, it means that our program has stopped and the server has broken down. At this time, the calculation result of the order is shown in the red box in the figure above. Check the log data again after we run the above command, from the start of open can see that this time is not from the initial state of the order, but from the result of the calculation before the last downtime, continue to calculate, to here the actual application test is completed.