This article focuses on one question: What is the difference between a Map stored in ValueState and a MapState?

If you do not understand the difference and use ValueState to store large objects, the following problems are likely to occur in your production environment:

· CPU is full · Swallowed up

1, the conclusion

The differences are described in terms of performance and TTL.

performance

· In the RocksDB scenario, the Map performance of MapState is higher than that of ValueState.

· MapState is strongly recommended for production environments, and large objects in ValueState are not recommended

· Storing large objects in ValueState can easily fill the CPU

· Heap State scenario, which has similar performance.

TTL

State in Flink supports setting TTL:

· TTL of MapState is based on THE UK level

· ValueState’s TTL is based on the entire key

The lines

Do not use ValueState to store lists in scenarios where ListState is available. MapState and ListState performance have been optimized a lot, high performance is not good? The underlying implementation principles of ValueState and MapState are analyzed in detail in the following sections to reach the above conclusions.

2. What data should be stored in State

ValueState stores keys, namespaces, and values, which are abbreviated to <K, N, V>. MapState stores the key, namespace, userKey, and userValue, which are abbreviated to <K, N, UK, UV>.

Explain these terms.

Key

ValueState and MapState are keyedstates, that is, they can be used only after keyBy. So you definitely want to save the key in State.

For example, keyBy by app. There are two apps, app1 and app2. App1 or App2 must be stored in the state storage engine to distinguish whether the current state data belongs to APP1 or App2.

App1 and app2 are key.

Namespace

Namespace Is used to distinguish Windows.

If the pv indicator of app1 and App2 needs to be counted for each hour, the hour-level window needs to be used. In order for the state engine to distinguish between pv values of APP1 at 7 and 8, it must add a dimension to identify the window.

Flink identifies Windows with a Namespace, which allows the state engine to distinguish between app1’s 7 and 8 o ‘clock state information.

Value, UserKey, UserValue

ValueState stores specific state values. That’s the pv value in the example above. MapState is similar to Map set, which stores a KV key-value pair. In order to distinguish it from the key of keyBy, the key and value of MapState are called UserKey and UserValue respectively in Flink.

Here’s how the state engine stores this data.

3. How does StateBackend store and read and write State data

Flink supports three StateBackend: MemoryStateBackend, FsStateBackend, and RocksDBStateBackend.

MemoryStateBackend and FsStateBackend store State in memory during task running. StateBackend stores snapshots in different Checkpoint locations. RocksDBStateBackend stores State in the local RocksDB database for the duration of the task.

Therefore, MemoryStateBackend and FsStateBackend are referred to as heap mode, and RocksDBStateBackend is referred to as RocksDB mode.

3.1 how to store ValueState and MapState in Heap mode

Heap mode means that all state data is stored in TM Heap memory, all state is stored in the original object, no serialization or deserialization. (Checkpoint involves serialization and deserialization, not reading and writing data, so we won’t discuss it here.)

In Heap mode, both ValueState and MapState are stored in CopyOnWriteStateMap<K, N, V>.

· Key and Namespace correspond to K and N of CopyOnWriteStateMap respectively.

· Value of ValueState corresponds to V of CopyOnWriteStateMap.

MapState will treat the entire Map as V of CopyOnWriteStateMap, equivalent to Flink engine creating a HashMap to store KV key-value pairs of MapState.

Specific CopyOnWriteStateMap is how to achieve, you can refer to: ten thousand word long explanation of CopyOnWriteStateTable in Flink.

What is the difference between a Map stored in ValueState and a MapState in Heap mode?

There is no difference in Heap mode.

ValueState stores a Map, which means that the user manually creates a HashMap and puts it into the state engine as V. MapState is the Flink engine that creates a HashMap for the user and puts it into the state engine as V.

In fact, Map and MapState stored in ValueState are the same, and the storage structure is CopyOnWriteStateMap<K, N, HashMap>. The difference is that ValueState is manually created by the user, while MapState is created by the Flink engine.

3.2 How to Store ValueState and MapState in RocksDB Mode

RocksDB mode indicates that all state data is stored in the TM local RocksDB database. RocksDB is a KV database, and all keys and values are byte arrays. So whether ValueState or MapState is stored in RocksDB, the object must be serialized into binary. Currently kv is stored in RocksDB.

■ 3.2.1 How does ValueState map to RocksDB’s KV

ValueState has keys, namespaces, and values to store.

1. Serialize the ValueState key into a byte array

2. Serialize the namespace of ValueState into a Byte array

3. Concatenate two byte arrays as keys for RocksDB

4. Serialize ValueState’s value into a byte array for RocksDB’s value

You can then write to RocksDB.

The same logic is used to query data: serialize the key and namespace together as the key of RocksDB, query in RocksDB, and deserialize the queried byte array to obtain ValueState value.

This is the read and write process of ValueState in RocksDB mode.

■ 3.2.2 How is MapState mapped to RocksDB’s KV

MapState contains key, namespace, userKey, and userValue.

1. Serialize the MapState key into a byte array

2. Serialize namespace of MapState into byte array

Serialize MapState’s userKey into a byte array

4. Concatenate three byte arrays as keys for RocksDB

5. Serialize MapState value into byte array for RocksDB value

You can then write to RocksDB.

The same logic is used to query data: Key, namespace, and userKey are serialized and joined together as the key of RocksDB, and queried in RocksDB. The queried byte array is deserialized to obtain the userValue of MapState.

This is the read and write process of MapState in RocksDB mode.

3.3 What is the difference between Map and MapState stored in ValueState in RocksDB mode?

■ 3.3.1 Assuming that the Map set has 100 KV key-value pairs, how will the two schemes store data?

ValueState stores a Map, and the Flink engine treats the entire Map as a large Value and stores it in RocksDB. Corresponding to RocksDB, the Map set of 100 KV key-value pairs will be serialized into a byte array as RocksDB value, which will be stored in the first row of RocksDB data.

MapState stores 100 KV key-value pairs in line 100 of RocksDB based on the userKey.

■ 3.3.2 Process of modifying a KV key pair in Map

In the case of ValueState, the entire Map set needs to be read out of RocksDB, although one KV key pair in the Map needs to be modified. The specific process is as follows:

1. Serialize the key and namespace into byte arrays to generate keys for RocksDB

2. Read the byte array of value corresponding to key from RocksDB

3. Deserialize the Byte array into the Map

4. Modify the Map collection in heap memory

5. To write the Map set to RocksDB, serialize the entire Map set into a byte array before writing

In the case of MapState, to modify a KV key-value pair in the Map, the key, namespace, and userKey can be used to locate the KV key-value pair to be modified. The specific process is as follows:

1. Serialize the key, namespace, and userKey into a byte array to generate the key of RocksDB

2. Read the byte array of value corresponding to key from RocksDB

3. Deserialize the Byte array to userValue

Modify userValue in heap memory

5. Write userKey and userValue to RocksDB

S 3.3.3 conclusion

To modify a KV key pair in the Map:

If ValueState is used to store maps, the entire Map set needs to be serialized and deserialized during each modification operation. Serializing large objects in deserialization consumes a lot of CPU and can easily fill up the CPU.

If MapState is used, each modification operation only needs to serialize and deserialize the data of the KV key-value pair of userKey, which is efficient.

In other scenarios where ValueState is used and value is a large object and value is frequently updated, the CPU is easily full.

For example, if the bitmap stored in ValueState needs to be updated for each bitmap, the CPU may be full.

Some of the implementation details were omitted for ease of understanding, and are added below.

3.4 Directly Concatenating a Key and namespace may cause a RocksDB Key conflict

Suppose ValueState has two data:

· Key1 serialized binary is 0x112233, namespace1 serialized binary is 0x4455

· Key2 serialized binary is 0x1122, namespace2 serialized binary is 0x334455

Both the corresponding RocksDB keys are 0x1122334455. In this case, two different keys and namespace become the same data mapped to RocksDB and cannot be distinguished.

Solution:

Write the byte array length of the key between the key and the namespace, and the byte length of the namespace after the namespace.

It is impossible to write these two lengths into a key conflict, and the reader can figure out why.

KeyGroupId is also stored in the key of 3.5 RocksDB

From KeyGroup to Rescale

Adding KeyGroupId is also simpler. Serialize KeyGroupId before serializing the key and namespace.

4. State TTL description

The implementation of TTL in Flink encapsulates a layer of user value. For details, refer to the TtlValue class below:

public class TtlValue<T> implements Serializable {
 @Nullable
 private final T userValue;
 private final long lastAccessTimestamp;
}
Copy the code

The TtlValue class has two fields that encapsulate the user’s value and a timestamp field that records when the data was written.

If TTL is enabled, the value stored in the state is the TtlValue object. The timestamp field is also saved to the state engine so that when the data is queried later, it can be used to determine whether the data is out of date.

· ValueState encapsulates value as TtlValue.

· MapState encapsulates userValue as TtlValue.

· ListState encapsulates Element as TtlValue.

What is the difference between a Map stored in ValueState and a MapState?

If ValueState contains a Map, the entire Map is treated as a value and only one timestamp is maintained. So either the entire Map is out of date, or none at all.

If 100 KV key-value pairs are stored in MapState, then all 100 KV key-value pairs will store their time stamps. Therefore, the TTL of each KV key pair is independent of each other.

5. To summarize

This paper analyzes the difference between Map and MapState in ValueState from the implementation principle. The following describes the differences in terms of performance and TTL.

performance

· In the RocksDB scenario, the performance of MapState is much higher than that of ValueState, and large objects stored in ValueState can easily fill the CPU

· Heap State scenario, which has similar performance

TTL

State in Flink supports setting TTL, which simply encapsulates the timestamp with the userValue.

· TTL of MapState is based on THE UK level

· ValueState’s TTL is based on the entire key

RocksDB uses the Merge Operator Implementation feature to map ListState data to RocksDB. If you are interested, you can read the RocksDB Wiki merge Operator Implementation. Links:

Github.com/facebook/ro…