Abstract: This article is compiled from ASF Member, Apache Flink & HBase PMC, Alibaba senior technical expert Li Yu (Juding), Keynote speech by Tang Yun, Apache Flink Committer and Alibaba technical expert, at the Flink Forward Asia 2021 Core Technologies Special Session. The main contents include:

  1. State Backend Improvement
  2. Snapshot Improvement
  3. Future Work

FFA 2021 Live Playback & Presentation PDF download

State Backend improvement

The Flink community state-Backend module has grown considerably in the past year. Prior to version 1.13, users had no means of monitoring the performance of state-dependent operators and no good way to know the latency of state read and write operations.

We introduced state access latency monitoring. The principle is to use System. NowTime to count the access latency before and after each state access, and then store it in a Histgram type indicator.

The sampling interval and historical data retention are the two configurations that need to be emphasized. The smaller the sampling interval is, the more accurate the data result is, but the impact on the performance of daily access is slightly greater. The more historical data is retained, the more accurate the data result will be, but the memory usage will be larger.

In Flink 1.14, we finally upgraded RocksDB from 5.17 to 6.20. In addition to several bug fixes, RocksDB also added some features that can be used in Flink 1.14 and 1.15. First, it supports the ARM platform to ensure that Flink jobs can run on the ARM basis. Second, it provides more fine-grained WriteBuffer memory control to improve the stability of memory control. In addition, deleteRange interface is also provided, which greatly helps to improve performance in capacity expansion scenarios.

As cloud native becomes more and more popular, more and more vendors are choosing to run Flink jobs in container environments via K8s call-in, which inevitably requires consideration of how constrained resources run stably, especially memory usage. RocksDB, which was born in 2010, was somewhat lacking in this area, and Flink 1.10 introduced memory control for the first time. Over the past year, RocksDB has made some improvements in memory management.

Let’s review the memory aspects of RocksDB, but before we get to that, we need to understand how Flink uses state and RocksDB.

  • Each state declared by Flink corresponds to a column family in RocksDB. The column family is an independent memory allocation in RocksDB. They are separated by physical resources.

  • Second, Flink does not limit the number of states a user can declare in an operator, so it does not limit the number of column families.

  • Finally, Flink can have multiple operators containing keyed state in a slot under slot-sharing mechanism.

For these three reasons, it is theoretically possible to use Flink in a way that leads to unrestricted memory usage, even without considering RocksDB’s own memory management limitations.

The figure above defines multiple instances of an SQL class RocksDB that shares a writeBuffer Manager and its corresponding block cache. The manager of multiple writeBuffers stores the applied contents in block cache for accounting. Data-related blocks are cached in block cache. The cache includes data-related data blocks. The indexed index block and filter block can be simply understood as write cache and read cache.

Thus, the writeBuffer Manager works in tandem with the Block cache by keeping books in the block cache.

After the buffer application process is completed, the Manager upgrades IO blocks in the block cache. By default, the IO block is 1/8 of the size of a writeBuffer. WriteBuffer configuration is 64Mb, so the SIZE of the IO block is 8Mb, and the 8Mb memory request is split into dummy entries. Shard allocated to Block number. In particular, Flink’s upgrade to RocksDB reduces the minimum cell size of dummy entry to 256KB, reducing the probability of memory overruns.

Because RocksDB is designed for multithreading, there are multiple shards in a cache, which makes memory allocation complicated.

When mutable WriteBuffer is converted to immutable WriteBuffer in memory generated by the internal implementation of WriteBuffer Manager. Immutable When a table is brushed to disk, by default, writeBuffer is flushed before it is allowed to be used. This can cause a problem with member table flush, even if the amount of data written is small, when applying for an Arena block, especially if there are many arena blocks. From the perspective of users, there are a large number of small SST files and the overall read and write performance is poor, so the Flink community has made a special verification function for arena block size configuration.

At present, RocksDB has insufficient memory management and control. Therefore, you need to reserve some external memory for RocksDB to overuse in certain scenarios. In comparison to the Flink Process memory model shown above, you can see that memory needs to be properly reserved on jVM-overhead to prevent RocksDB from overrunning. The table on the left shows the default configuration values for JVM-overhead. If you want to configure jVM-overhead to 512Mb, you only need to configure both mini and Max to 512Mb.

Data blocks, index blocks, and Fliter blocks have race problems in memory limited scenarios. The Block instance in the figure above is drawn according to the actual size. For example, the SST of 256Mb file is drawn, where the index Block is about 0.5MB, the Fliter Block is about 5Mb, and the data Block is generally 4KB-64KB. As you can see, block contention results in a lot of in-and-out swaps, which greatly affects read performance.

To address the above problems, we encapsulated the partition-index and partition-filter functions of RocksDB to optimize performance under memory constraints. Index and filter are layered to store data blocks in limited memory, reducing disk read probability and improving overall performance.

In addition to stability related improvements, Flink has also refactored state-related apis to make them more user-friendly for beginners.

The previous API was a mix of stateBackend for state reading and writing and checkpoint for fault-tolerant backup. For example, MemoryStatebackend and FsStateBackend are the same in terms of state access and object access. The difference lies in fault-tolerant backup. Therefore, beginners can easily confuse the difference.

The figure above shows the difference between the Flink status-reading and fault-tolerant lookup APIS after the update and before the update.

State access and fault-tolerant backup have been set up separately in the new version of the API. As you can see, MemoryStatebackend and FsStateBackend are both responsible for state reads and writes for HashMaoStateBackend.

Both the biggest difference is that in terms of checkpoint fault-tolerant, one is the corresponding memory ManagercCheckpointStorage, all of them, and while the other is a corresponding FileSystemSCheckpointStorage based on file. We believe that by refactoring the API, we can give developers a deeper understanding.

Second, the Snashot Improvement

SavePoint is decoupled from state-backend and is not limited to what state-backend is used to implement it. In previous versions of Flink, different state-Backend SavePoint formats were different, but in the new version of Flink, the community has unified the related SavePoint formats. State-backend can be seamlessly switched for the same job without losing state.

In addition, the community further enhances the stability of unaligned checkpoint. The buffer in the channel is pre-persisted as part of the operator state as in-flight data to avoid barrier alignment time.

In addition, Flink supports traditional automatic switches between aligned and unaligned. When a global timeout is set, Flink checkpoint switches automatically when Flink’s checkpoint reaches a threshold. We believe that the introduction of this feature can further help developers achieve better checkpoint performance.

Third, the Future Work

In the future, we will further improve the ease of production of RocksDB Backend. On the one hand, we will add some key performance indicators, such as block cache hit ratio, to the standard monitoring indicators to make it easier to tune RocksDB’s performance. On the other hand, we plan to redirect the RocksDB log files to the TM log directory or TM logs to make it easier to view the RocksDB log information for problem location and tuning.

Second, we will further clarify the snapshot semantics of Flink. Currently, there are three types of snapshot in Flink, namely checkpoint, savepoint and retained checkpoint.

  • A checkpoint is a system snapshot whose data life cycle is completely controlled by the Flink framework. It is used to fail over when exceptions occur and is automatically deleted once the operation stops.
  • Savepoint is responsible for data backup in a unified format. Its life cycle is decoupled from Flink jobs and completely controlled by users. It can be used for version upgrade of Flink jobs, cross-cluster migration, and state-backend switching.
  • However, retained checkpoint semantics and lifecycle are vague at present. It can exist independently of the lifecycle of Flink jobs. However, when restoring incremental snapshots based on it and opening incremental snapshots, checkpoint of new jobs will rely on the data. This makes it difficult for users to determine when it is safe to remove it.

To solve this problem, the community proposed Flip-193, which requires users to declare whether claim or no-claim mode should be used when performing a task across checkpoint.

  • Retained checkpoint data lifecycle is completely controlled by the new operation if a data Compaction occurs, and a new snapshot no longer relies on the data retained from a retained checkpoint. New jobs can be safely deleted;
  • However, if the no-claim mode is used, the retained checkpoint data cannot be modified. This means that the retained checkpoint data must be copied for the first time and cannot be referenced. In this way, you can manually delete retained checkpoint as needed without worrying about affecting the jobs based on it.

In addition, we plan to provide clearer semantics for user-controlled snapshots in the future, and introduce the concept of Savepoint in Native format to replace retained checkpoint.

Finally, a word about the work in progress on the FLIP 158. Changelog Based State Backend provides rapid and stable incremental snapshots, which is similar to log-based snapshots. It has a shorter snapshot interval than the existing robust incremental snapshot mechanism of Snapshot, but at the same time sacrifices some state data processing latency. This is a trade-off between delay and fault tolerance.

FFA 2021 Live Playback & Presentation PDF download