Author: Sun Mengyao Collated: Han Fei

The main contents of this article are as follows:

  • Basic concepts of state management;
  • Types and usage examples of states;
  • Fault tolerance mechanism and fault recovery;

Basic concepts of state management

1. What is status

Let’s take an example of stateless computing: consumption delay computing. Suppose you now have a message queue, in which a producer continues to write messages to the consumer queue, and multiple consumers read messages from the message queue. As you can see from the figure, the producer has written 16 messages, and the Offset stays at 15. There are three consumers, some fast, and some slow. Fast consumers have consumed 13 pieces of data, while slow consumers have consumed only 7 or 8 pieces of data.

The figure shows an example of input and output on how to count how many points each consumer is behind in real time. You can see that the input point in time has a timestamp, that the producer wrote the message to a point in time, and that each consumer read at the same point in time. As mentioned earlier, the producer wrote 15, and the consumer read 10, 7, and 12, respectively. The question then arises, how to convert the progress of producers and consumers into schematic information on the right?

Consumer 0 lags behind 5, Consumer 1 lags behind 8, and Consumer 2 lags behind 3. According to Flink’s principle, Map operation is required here. Map first reads the message in and then subtracts it to see how many bars each consumer is behind. If the Map is continuously delivered, the final result is obtained.

You will see that in this mode of computation, the output is the same no matter how many times the input is entered, because all the information you need is already contained in a single input. Lagging consumption is equal to producer minus consumer. The consumption of the producer can be obtained in a single piece of data, and the data of the consumer can also be obtained in a single piece of data, so the same input can get the same output, which is a stateless calculation.

What is stateful computation accordingly?

Take the example of access log statistics to illustrate, for example, currently get a Nginx access log, a log represents a request, record where the request comes from, which address, need to real-time statistics of how many times each address was accessed, that is, how many times each API was called. You can see the following simplified input and output. The first input is GET/API /a at some point; The second log records Post/API /b at some point in time; The third is that at some point in time a/API /a was got, and there were three Nginx logs in total. As you can see from the 3 Nginx logs, the first output/API /a was accessed once, the second output/API /b was accessed once, and the second output/API /a was accessed twice. Nginx/API /a: count=1; count=2; Nginx/API /a: count=1; The output depends on how many times the currently requested API address has been accessed. The number of visits to the API for the first time is 0, count= 1. The number of visits to the API for the second time has already been 1, so the number of visits to/API /a is count=2. A single piece of data actually contains only information about the current visit, not all of it. To get this result, you also rely on the amount of cumulative API access, known as state.

This calculation mode is to input data into operators, used to perform various complex calculations and output data. The operator is going to access the state that was stored in there before. On the other hand, it updates the status of the current data in real time, so if you enter 200 data, the output will be 200 results.

What scenarios use states? Here are four common ones:

  • Deduplication: For example, the data in the upstream system may be duplicated, and the redundant data is expected to be removed when it falls to the downstream system. To de-duplicate, you need to know which data has come in and which data has not come in, that is, all the primary keys are recorded, and when a data comes in, you can see whether it exists in the primary key.
  • Windowing calculations: such as counting how many times the Nginx logging API is accessed per minute. The window is calculated once a minute. Before the window is triggered, such as the window from 08:00 to 08:01, the data in the first 59 seconds need to be put into memory first, that is, the data in the window need to be reserved first, and the data triggered in the whole window will be output after one minute at 8:01. Untriggered window data is also a state.
  • Machine learning/deep learning: If the trained model and the parameters of the current model are also a state, machine learning may use a data set every time, and it needs to learn on the data set and give a feedback to the model.
  • Access historical data: For example, to compare yesterday’s data, you need to access some historical data. If the data is read from the outside every time, the resource consumption may be large, so you also want to put the historical data into the state for comparison.

2. Why manage state

The most straightforward way to manage state is to put all the data in memory, which is also a common practice. For example, in WordCount, Word is the input and Count is the output. The input is continuously added up to Count during the calculation.

However, for streaming operations, there are the following requirements:

  • 7*24 hours operation, high reliability;
  • Data is not lost and not heavy, just calculate once;
  • Real-time data output without delay;

Based on the above requirements, memory management will have some problems. Because the capacity of memory is limited. If you want to do calculations in a 24-hour window and put all the data in memory for 24 hours, there may be insufficient memory. In addition, the operation is 7*24, which needs to ensure high availability. If the machine fails or goes down, we need to consider how to back up and recover from the backup to ensure that the running operation is not affected. In addition, considering the lateral extension, if the site traffic is not high, statistics of each API to access the program can use the number of single thread to run, but if the site visits to the sudden increase, single node can’t handle all access to the data, the need to add a few nodes on the horizontal extension, then how the state of the data evenly distributed to the new node is also one of the problems. Therefore, putting all the data in memory is not the most appropriate way to manage state.

3. Ideal state management

The optimal state management needs to meet the three requirements of ease of use, efficiency and reliability:

  • Easy to use, Flink provides a rich data structure, a variety of state organization forms and a concise extension interface, so that state management is more easy to use;
  • Efficient, real-time operations generally require lower latency and faster recovery in the event of a failure; When the processing capacity is insufficient, it can be horizontally expanded without affecting the processing performance of the job itself.
  • Reliable. Flink provides state persistence, including no-lose, no-regain semantics, and automatic fault tolerance, such as HA, which pulls up when a node fails without human intervention.

Flink state types and usage examples

1.Managed State & Raw State

Managed State is Flink’s automatic Managed State, while Raw State is the original ecological State. The difference between the two is as follows:

  • In terms of State management mode, Managed State is Managed by Flink Runtime, automatic storage, automatic recovery, optimized in memory management; However, Raw State needs to be managed by users and serialized by themselves. Flink does not know what structure the data stored in State is, and only users know it, so it needs to be serialized into a data structure that can be stored.
  • In terms of State data structures, Managed State supports known data structures such as Value, List, Map, etc. Raw State only supports byte arrays, and all states must be converted to binary byte arrays.
  • The Managed State can be used in most scenarios. The Raw State is recommended when the Managed State is insufficient, for example, when you need to customize an Operator.

2.Keyed State & Operator State

Managed State is divided into two types: Keyed State; The other is Operator State. In the Flink Stream model, Datastream can become a KeyedStream by keyBy. Each Key corresponds to a State, that is, an Operator instance processes multiple keys and accesses the corresponding multiple states, thus deriving the Keyed State. Keyed State can only be used in KeyedStream operators, that is, there is no way to use KeyedStream in the entire program without keyBy.

In contrast, Operator State can be used for all operators and has a better matching method than data sources. It is often used for sources, such as FlinkKafkaConsumer. Compared with Keyed State, one Operator instance corresponds to one State. In Keyed State, State migrates between instances with Key as the concurrency changes. / API /a and/API /b are stored in this instance; If the request volume becomes large and needs to be expanded, the status of/API/A and/API /b are placed on different nodes. Because Operator State does not have a Key, you need to select how the State is reassigned when concurrency changes. There are two built-in allocation methods: one is uniform allocation, and the other is to consolidate all the states into full states and redistribute them to each instance.

On access, the Keyed State is accessed via RuntimeContext, which requires the Operator to be a Rich Function. Operator State needs to implement its own CheckpointedFunction or ListCheckpointed interface. In terms of data structure, Keyed State supports data structures, such as ValueState, ListState, ReducingState, AggregatingState and MapState. Operator State supports relatively few data structures, such as ListState.

3.Keyed State Example

There are many kinds of Keyed states, and the relationship among them is shown in the figure. First, the primary subclasses of State include ValueState, MapState, and AppendingState. AppendingState has a subclass, MergingState. MergingState is divided into three subclasses, namely, ListState, ReducingState and AggregatingState. This inheritance relationship makes their access methods and data structures also different.

The differences of several Keyed states are as follows:

  • ValueState stores a single value, such as Wordcount, with Word as its Key and State as its Count. A single value can be a number or a string, and as a single value, there are two possible access interfaces, GET and set. Update (T)/T value()
  • The State data type of MapState is Map, and the State data type is PUT and remove. Note that the MapState key and Keyed state key are not the same.
  • ListState The status data type is List and the access interface is add, update, etc.
  • ReducingState and AggregatingState are the same parent class as ListState, but the state data type is a single value, because the add method does not append the current element to the list. Instead, the current element is updated directly into the Reducing result.
  • The difference between AggregatingState is that IN the access interface, ReducingState, the add (T) and T get() elements IN and OUT are the same type, but IN the AggregatingState input IN, output OUT.

ValueState is used as an example to explain how to use it.

Source code address: github.com/apache/flin…

Interested students can directly view the full source code, in this section. The figure shows the contents of the main method and main function of Flink job. The previous input, the following output and some personalized configuration items have been removed, and only the trunk is retained.

Events is a DataStream, which uses env.addsource to load data into the DataStream. Then there is a DataStream called alerts. Then a StateMachineMapper in flatMap. StateMachineMapper is a state machine. A state machine refers to the combination of different states and different conversion relations between states. A simple example is the process of buying things. Place the order first, and the state after the order is generated is to be paid. When the payment is successful in another event state, the state of the event will change from to be paid and to be shipped. After payment has been made, the status of the goods to be shipped will be delivered by another event, and the status of the order will change to in delivery. When the status of the goods to be shipped is signed by another event, the status of the order will change to in received. At any point in the process, there can be an event that cancels the order, and in any state, once that cancels the order event it will eventually change the state to canceled, and that’s the end of the state.

How is the Flink write state machine implemented? Keyed State getRuntimeContext; getRuntimeContext; getRuntimeContext; So you need to get currentState in the open method, and then getState, and currentState holds the state on the currentState machine.

If the order has just been placed, currentState is pending payment, and when initialized, currentState represents order completion. When the order is received, the flatMap method will be used. In the flatMap method, a State will be defined first, and the Value will be retrieved from currentState, that is, Value, and then the Value will be determined whether the Value is null. If sourceAddress State is null, This state should be the initial state of the newly created order, that is, to be paid. We then assign state = state. Initial, noting that state is a local variable, not the state managed in Flink, and remove its value from the state. Locally and next will be to a variable, and then the transition, coupled with the influence of events to it, just for payment orders received payment successful event, will become already payment, to send the goods, then nextState can be calculated. In addition, we also need to judge whether State is legal. For example, if a signed order comes into another State called cancel order, it will be found that the signed order cannot be canceled, and this State will be issued, and the order State is illegal.

If the state is not an invalid state, it is also determined whether the state is not convertible. For example, if the state becomes cancelled, no other state will occur, and then the state will be cleared from state. Clear is a public method that all Flink management keyed states have, which means to delete the information. If it is neither an illegal state nor an end state, there may be more transitions later. In this case, the current state of the order needs to be updated. In this way, ValueState is initialized, evaluated, updated, and cleared. During the whole process, the state machine delivers invalid status for downstream processing. Other states are used similarly.

Fault tolerance mechanism and fault recovery

1. How to save and restore the status

Flink status is saved using the Checkpoint mechanism. The Checkpoint mechanism periodically creates distributed snapshots to back up status in the program. Distributed snapshot is how to achieve can refer to the content of [second class], here is not to elaborate on how to achieve distributed snapshot. After a distributed snapshot Checkpoint is completed, how can I restore a fault? Suppose the job runs on three machines and one of them fails. In this case, the process or thread needs to be moved to the two active machines. In this case, all tasks in the entire job need to be rolled back to the state of the last successful Checkpoint and continue processing from this point.

To restore data from Checkpoint, the data source must support data resending. After Checkpoint recovery, Flink provides two consistency semantics, exactly once and at least once. During Checkpoint operation, you can check whether the value is exactly once or at least once according to Barries alignment. If the value is aligned, the value is exactly once; otherwise, the value is at least once. If the job is single-threaded, Barries do not need to be aligned; If there is only one Checkpoint, it will return to its original state whenever it recovers from a Checkpoint. If there are multiple nodes, the state in memory is stored if one Barries of data has arrived and another Barries has not. Then the two streams are not aligned, and one of them may duplicate during recovery.

Checkpoint is implemented using code as follows:

  • Start with env.enablecheckPointing 1000, which means that the event interval of 2 Checkpoint is 1 second. The more frequently Checkpoint operations are performed, the less data is sought during recovery and the more I/O consumption at Checkpoint.
  • The next step is to set up the Checkpoint model with Exactly_Once semantics and Barries alignment to ensure that messages are not lost or duplicated.
  • SetMinPauseBetweenCheckpoints is between two Checkpoint is to wait at least 500 ms, which is just finished a Checkpoint. For example, if a Checkpoint is performed 700ms later than 300ms, the next Checkpoint should be performed at 300ms later than 1000ms. However, the wait time is shorter than 500ms, so you need to wait 200ms longer. In this way, Checkpoint frequency is prevented from slowing down service processing speed.
  • SetCheckpointTimeout Indicates the Checkpoint timeout period. If the Checkpoint is not completed within 1 minute, the Checkpoint timeout fails. SetMaxConcurrentCheckpoints said how many Checkpoint in the snapshot at the same time, this can be set according to the specific needs to do.
  • EnableExternalizedCheckpoints said Cancel the need to keep the current Checkpoint, the default Checkpoint will be deleted when the Cancel the whole operation. Checkpoint is a job-level savepoint.

As mentioned above, in addition to failover, you also need to be able to manually adjust and reassign these states concurrently. If you manually adjust concurrency, you must restart the job and a message indicating that Checkpoint no longer exists is displayed. How can the job restore data?

On the one hand, Flink allows Checkpoint retention in external media at Cancel. On the other hand, Flink has another mechanism called SavePoint.

Savepoint is similar to Checkpoint in that it stores state to external media. When a job fails, it can be recovered from the outside. What’s the difference between Savepoint and Checkpoint?

  • In terms of trigger management, Checkpoint is automatically triggered and managed by Flink, while Savepoint is manually triggered and managed by users.
  • In terms of functions, Checkpoint quickly recovers tasks when exceptions occur, such as network jitter or timeout. Savepoint backs up tasks in a planned way to stop tasks and then recovers them, such as modifying code or adjusting concurrency.
  • Checkpoint is lightweight. When a job fails, it automatically recovers from the fault and is cleared by default after the job stops. Savepoint, on the other hand, is more persistent and stored in a standard format, allowing code or configuration to change. Recovery requires starting a job to manually specify a path for recovery.

2. Optional state storage mode

The first type of Checkpoint storage is MemoryStateBackend, which is constructed by setting the largest StateSize and choosing whether to make asynchronous snapshots. The storage state itself is stored in TaskManager node memory. Because of memory capacity constraints, the single State maxStateSize defaults to 5 M, and note that maxStateSize <= akka.framesize defaults to 10 M. Checkpoint is stored in the JobManager memory. Therefore, the total size cannot exceed the JobManager memory. The recommended scenarios are: local tests, nearly stateless jobs, such as ETL, JobManager, where the failure is not easy, or where the failure has little impact. Do not use this command in production scenarios.

The other is FsStateBackend on the file system, which is built by passing a file path and whether to take snapshots asynchronously. State is still in TaskManager memory, but unlike MemoryStateBackend, which has a 5 MB upper limit, Checkpoint is stored in an external file system (local or HDFS), breaking the total size limit for Jobmanager memory. In terms of capacity limits, the total number of states on a TaskManager cannot exceed its memory, and the total size cannot exceed the configured file system capacity. Recommended scenarios, jobs in general use status, such as minute-level window aggregation or join, and jobs requiring HA.

RocksDBStateBackend is a memory storage system for keys and values. Like other keys and values, RocksDB stores the state in the memory. When the memory is nearly full, RocksDBStateBackend writes the state to disk. Note that RocksDB does not support Checkpoint synchronization. The constructor does not have the option to synchronize snapshots. However, RocksDB supports incremental Checkpoint Backend, which is currently the only incremental Checkpoint Backend. This means that users do not need to write all states into it each time. Its Checkpoint is stored in an external file system (local or HDFS). The capacity of the Checkpoint is limited as long as the total number of states on a TaskManager does not exceed its memory + disk capacity, the maximum size of a single Key is 2 GB, and the total size does not exceed the configured file system capacity. You are advised to use the following scenarios: jobs with large status, such as day-level window aggregation, jobs that need to enable HA, and jobs that do not require high status read/write performance.

4. To summarize

1. Why use states?

As mentioned earlier, stateful jobs should have stateful logic. Stateful logic is because there is correlation between data. A single piece of data cannot represent all information. So you need state to satisfy the business logic.

2. Why manage state?

States are used, why manage states? Because real-time operations require 7*24 continuous operation, they need to deal with the impact of unreliable factors.

3. How to select the state type and storage mode?

So how do you choose the type of state and how to store it? Combined with the previous content, it can be seen that the first is to analyze clear business scenarios; Like what you want to do, how big you are. Compare the advantages and disadvantages of each solution and select the appropriate state type and storage mode according to the requirements.

Video review: www.bilibili.com/video/av497…


Apache Flink Community Recommendation ▼

Apache Flink and Flink Forward Asia 2019, the top event in the field of big data, are now collecting topics, limited early bird ticket discount ING. To learn more about Flink Forward Asia 2019, check out:

Developer.aliyun.com/special/ffa…

The first Apache Flink Geek Challenge is open, focusing on machine learning and performance optimization. 400,000 prize money is waiting for you. To join the challenge, please click:

Tianchi.aliyun.com/markets/tia…

Follow Flink’s official community wechat official account to learn more about Flink!