Stateful computing, as fault tolerance and data consistency guarantee, is one of the essential features of real-time computing today. Popular real-time computing engines including Google Dataflow, Flink, Spark (Structure) Streaming, and Kafka Streams all provide support for built-in State, respectively. The introduction of State enables real-time applications to store metadata and intermediate data without relying on external databases. In some cases, they can even store result data directly with State, which makes the industry can’t help thinking: what is the relationship between State and Database? Is it possible to use State instead of a database?
The Flink community is an early adoptor of this topic. In general, the efforts of the Flink community can be divided into two lines: the ability to access State, QueryableState, through the job query interface while the job is running; The second is the ability to query and modify State offline through the offline dump file of State (Savepoint), that is, the Savepoint Processor API to be introduced soon.
QueryableState
In version 1.2 of Flink, released in 2017, Flink introduced QueryableState to allow users to query the contents of job State through specific clients [1], This means that Flink applications can provide real-time access to computed results without relying at all on external storage other than State storage media.
Real-time data access is provided only through Queryable State
However, QueryableState, while idealistic in its vision, is still in Beta and not ready for production due to changes that depend on the underlying architecture and limited functionality. To solve this problem, Some time ago, Tencent engineer Yang Hua proposed an improvement plan of QueryableState [2]. On the mailing list, the community discussed and differed on whether QueryableState could be used instead of a database. The author sorted out the main advantages and disadvantages of State as Database based on personal opinions as follows.
Advantages:
-
Lower data latency. In general, the calculation results of Flink application need to be synchronized to the external database, for example, the calculation results of output window are triggered periodically. However, such synchronization is usually timed, which will bring a certain delay, leading to the awkward situation that the calculation is real-time but the query is not. However, direct State can avoid this problem.
-
Stronger data consistency assurance. Depending on the characteristics of external storage, Flink Connector or custom SinkFunction can provide different consistency guarantees. For example, for HBase that does not support multi-line transactions, Flink can only ensure Exactly-Once delivery through idempotent service logic. State, by contrast, has no problem with the exact-once delivery guarantee.
-
Save resources. By reducing the need to synchronize data to external storage, we can save on serialization and network transfer costs, and of course database costs.
Disadvantages:
-
SLA protection is insufficient. Database technology has been very mature, with a lot of accumulation in usability, fault tolerance and operation and maintenance. At this point, State is still in the primitive period. In addition, from the perspective of positioning, Flink job has the down time caused by version iteration maintenance or automatic restart when encountering errors, which cannot achieve the high availability of database data access.
-
May result in unstable operations. An unconsidered Ad-Hoc Query may require scanning and returning data of an exaggerated magnitude, which can place a significant strain on the system and potentially affect the proper execution of the job. Even a reasonable Query can affect the efficiency of a job with a large number of concurrent requests.
-
Do not store too much data. The State runtime is mainly stored in the TaskManager local memory and disk. If the State is too large, the TaskManager OOM or disk space is insufficient. In addition, a large State indicates a large checkpoint, which may result in checkpoint timeout and significantly prolong the job recovery time.
-
Only the most basic queries are supported. State can only do the simplest data structure queries, does not provide the same computing power as a relational database, and does not support optimization techniques such as predicate push-down.
-
It can only be read, but cannot be modified. State can only be changed at run time by the job itself, and if it is, it can only be changed by the Savepoint Processor API below.
Overall, the disadvantages of State in place of a database currently far outweigh the advantages, but using State as a database is perfectly reasonable for some jobs where data availability is not critical. Due to different positioning, it is difficult to see the possibility of Flink State completely replacing database in a short time, but there is no doubt that State will develop towards database in terms of data access characteristics.
Savepoint Processor API
The Savepoint Processor API is a new feature recently proposed by the community (see Flip-42 [3]) for analyzing, modifying, or directly building an initial Savepoint from the data of the State dump file Savepoint offline. The Savepoint Processor API belongs to the State Management of Flink State Evolution. If QueryableState is a DSL, Flink State Evolution is DML, and Savepoint Processor API is the most important part of DML.
The predecessor of Savepoint Processor API is a third-party Bravo project [4]. The main idea is to provide the capability of converting Savepoint and DataSet. The typical application is that Savepoint reads the DataSet. Modify the DataSet and write it as a new Savepoint. This is suitable for the following scenarios:
-
Analyze job State to study its patterns and rules
-
Troubleshooting or auditing
-
The initial State to build for the new application
-
Modify Savepoint, for example:
-
Change the maximum parallelism of a job
-
Make huge Schema changes
-
Fix problematic states
Savepoint, as a dump file of State, exposes data query and modification functions through the Savepoint Processor API, similar to an offline database. However, the concept of State is still quite different from the concept of typical relational data. The FLIP-43 also draws analogies and summaries of these differences.
Firstly, Savepoint is a physical storage set of the states of multiple operators. The states of different operators are independent, which is similar to the table between different namespaces in a database. We can get that Savepoint corresponds to the database and a single operator corresponds to a Namespace.
DatabaseSavepointNamespaceUidTableState
In the case of table, however, the corresponding concept in Savepoint varies depending on the State type. State Includes Operator State, Keyed State, and Broadcast State. The Operator State and Broadcast State are non-partitioned State. That is, there is no Keyed state, and Keyed state is partitioned state. For non-partitioned State, state is a table, and each element of state is a row in the table. For partitioned State, all states under the same operator correspond to one table. The table has a row key, like HBase, and each state corresponds to a column in the table.
For example, suppose we have a data stream of player scores and time spent online. We need to record the score and time spent in the player’s group with Keyed State, and record the total score and time spent in the player’s group with Operator State.
The data stream input over time is as follows:
Charlottea3 user_iduser_nameuser_groupscore1001PaulA5, 0001002, 6001003 katec2, 0001004 robertb3, 900 user_iduser_nameuser_group Time1001PaulA1, 8001002 charlottea1, 2001003 katec6001004robertb2, 000
With Keyed State, we registered group_score and group_time mapstates respectively to represent the total score and total duration of the group, and updated the accumulated values of the two indicators into State according to the user_group keyBY data flow. The resulting table is as follows:
User_groupgroup_scoregroup_timeA8, 6003000 C2, 00600 b3, 9002000
In contrast, if we use Operator State to record the total score and total duration (the degree of parallelism is set to 1), we register total_score and total_time states and get two tables:
Total_score | — – | — – | 14500
Total_time5, 600
The relationship between Savepoint and Database should be clear at this point. For Savepoint, there is a different StateBackend to determine how State is persisted, which obviously corresponds to the storage engine of the database. ALTER TABLE XXX ENGINE = InnoDB; ALTER TABLE XXX ENGINE = InnoDB; To change the storage engine, MySQL will automatically do the tedious formatting work behind the scenes. As for Savepoint, StateBackend cannot be easily switched due to its incompatible storage formats. To this end, the community recently created flip-41 [5] to further improve Savepoint’s operability.
conclusion
State as Database is the general trend of the development of real-time computing. It is not to replace the use of Database, but to learn from the experience in the field of Database to expand the State interface and make its operation mode closer to the Database we are familiar with. External use of State by Flink can be divided into real-time online access and offline access and modification, which are supported by Queryable State and Savepoint Processor API, respectively.
The original link
This article is the original content of the cloud habitat community, shall not be reproduced without permission.