Editor’s note:
This article introduces the latest progress of TiFlink project in TiDB Hackathon 2020 competition. TiKV and Flink are used to realize the function of strongly consistent materialized view.
The author is Zhang Eggplant, an enthusiast of algorithms, distributed technology and functional programming. Personal blog: io-meter.com/
At the TiDB Hackathon earlier this year, my teammates and I tried to use Flink to add materialized views to TiDB and won the “Best Popularity Award”. It can be said that materialized view is a hot topic in this competition. There are three or four teams working with Flink alone. It must be admitted that at the end of the competition, the completion of our project was very low. Although the basic idea had been finalized, the final result was far from the expectation. After more than half a year of intermittent repair, today we can finally release a preview version for you to try. This article is an introduction to our ideas and achievements. Compared to other teams, our main goal is to achieve strongly consistent materialized view building. That is, to ensure that the materialized view at query time can achieve an Isolation level close to Snapshot Isolation rather than Eventual Consistency of normal stream processing systems. The discussion of achieving consistency is discussed in more detail below.
Use profile
Although an experimental project, we explored some handy features, including:
Zero external dependencies: There is no need to maintain any other components (including Kafka cluster and TiCDC) other than TiDB cluster and Flink deployment environment. This is because TiFlink reads and writes data directly from TiKV, without passing through any intermediate layer, making it possible for higher throughput, lower latency, and easier maintenance.
Easy-to-use interface: Although TiFlink has introduced some new concepts to achieve strong consistency, the specially written TiFlinkApp interface allows users to quickly start a task without manually creating a write target table.
Batch stream combination: After a task is started, the current data in the source table is consumed in batches, and then the CDC log consumption is automatically switched to. This process also ensures consistency of views.
For more information about TiFlink’s utility, see README. Here is a snippet of code to quickly start a task:
TiFlinkApp.newBuilder() .setJdbcUrl("jdbc:mysql://root@localhost:4000/test") // Please make sure the user has correct permission .setQuery( "select id, " + "first_name, " + "last_name, " + "email, " + "(select count(*) from posts where author_id = authors.id) as posts " + "from authors") // .setColumnNames("a", "b", "c", "d") // Override column names inferred from the query // .setPrimaryKeys("a") // Specify the primary key columns, defaults to the first column // .setDefaultDatabase("test") // Default TiDB database to use, defaults to that specified by JDBC URL .setTargetTable("author_posts") // TiFlink will automatically create the table if not exist // .setTargetTable("test", "author_posts") // It is possible to sepecify the full table path .setParallelism(3) // Parallelism of the Flink Job .setCheckpointInterval(1000) // Checkpoint interval in milliseconds. This interval determines data refresh rate .setDropOldTable(true) // If TiFlink should drop old target table on start .setForceNewTable(true) // If to throw an error if the target table already exists .build() .start(); // Start the appCopy the code
Consistency of materialized views (stream processing systems)
Current mainstream materialized view (stream processing) systems mainly use final consistency. In other words, although the final result converges to a consistent state, the end user may still query some inconsistent results during processing. Consistency turns out to be sufficient in many applications, but is greater consistency really needed? What does consistency have to do with Flink’s Exact Once semantics? Some introductions are necessary.
ACID
ACID is a basic concept of a database. In general, the database that is the source of CDC logs already guarantees these four requirements. However, some of these constraints can be broken when streaming CDC data. The most typical case is the loss of Atomic features. This is because a change in a transaction may cover multiple records in the CDC log, and the flow processing system may break atomicity if it processes in units of behavior. That is, a user querying on a result set will see an incomplete transaction. A typical case is as follows:
Change Log and atomicity of transactions
In the above case, we have an account table with transfer operations between the account tables, which tend to produce multiple records because the transfer operations involve multiple row modifications. Suppose we have a materialized view defined in SQL that calculates the sum of all account balances:
SELECT SUM(balance) FROM ACCOUNTS;
Obviously, if we only have transfers between accounts in the table, the query should always return some constant. However, because current flow processing systems cannot handle atomicity of transactions, the results of this query can be constantly fluctuating. In fact, on a source table that is constantly being modified concurrently, the fluctuations may even be unbounded.
In spite of the consistent model, finally the results of the query after a period of time will converge to the correct values, but no guarantee atomicity of materialized views still limit the application of the scene: suppose I want to achieve a when the query results deviation is too large to alarm tool, I may receive a lot of false alarm. That is to say, there is no exception on the database side at this time, the value of the deviation is only from the flow processing system. Another case of atomicity destruction in distributed systems is when the side effects of a transaction modification are distributed across multiple nodes. If distributed commit is not done at this point using methods such as 2PC, atomicity is also broken: changes on some nodes (partitions) take effect before others, resulting in inconsistencies.
Linear consistency
Unlike CDC logs generated by standalone databases (such as MySQL’s Binlog), logs generated by distributed databases such as TiDB have linear consistency issues. In our scenario, the problem of linear consistency can be described as a sequence of operations performed from the user’s point of view, whose side effects (logs) are processed by the flow processing system in a different order due to delays in message delivery. Suppose we have two tables, ORDERS and PAYMENTS, and the user must create the order before payment can be made, so the result of the following query must be positive:
WITH order_amount AS (SELECT SUM(amount) AS total FROM ORDERS),WITH payment_amount AS (SELECT SUM(amount) AS total FROM PAYMENTS)SELECT order_amount.total - payment_amount.totalFROM order_amount, payment_amount;
Copy the code
But because the ORDERS table and the PAYMENTS table are stored on different nodes, the flow processing system may consume them at different rates. That is, the flow processing system may have seen the record of the payment information, but the corresponding order information has not arrived yet. Therefore, it is possible to observe negative results from the above query.
In stream processing systems, the concept of Watermark can be used to synchronize the processing progress of data from different tables, but it does not avoid the above linear consistency problems. This is because Watermark only requires that all records with a timestamp smaller than it arrived, not that all records with a timestamp larger than it did not arrive. That is, even though the ORDERS table and PAYMENTS appear to have the same Watermark, the latter may still have some first-arrival records that have been activated. Therefore, Watermark alone cannot handle linear consistency, and must work with the source database’s time generation system and messaging system.
The need for greater consistency
Although final consistency is sufficient in many scenarios, there are still many problems:
-
Misleading users: Many users make decisions based on the unconverged query results because they do not know the knowledge related to consistency or have some misunderstanding about it. This situation should be avoided when most relational databases are highly consistent by default.
-
Poor observability: Since the final consistency is not guaranteed by the convergence time, and considering the existence of linear consistency, it is difficult to define the delay, data freshness, throughput and other indicators of the flow processing system. For example, the user may see the result of A JOIN between the current snapshot of table A and the snapshot of table B ten minutes ago. How do you define the delay of the query result?
-
Limited implementation of some requirements: As mentioned above, some alarm requirements either cannot be implemented or have to be delayed for some time due to inconsistent internal states. Otherwise, users will have to accept a higher rate of false positives.
In fact, the lack of greater consistency also makes it difficult for some operations, especially DDL class operations, to take advantage of previously calculated results. Based on the history of relational databases and NoSQL databases, we believe that the current mainstream ultimate consistency is only a temporary measure limited by technological development, and with the progress of relevant theoretical and technical research, stronger consistency will gradually become the mainstream of flow processing systems.
Technical Solution Introduction
Here’s a closer look at TiFlink’s technical solution and how it implements strongly consistent materialized view (StreamSQL) maintenance.
TiKV and Flink
Although this is a TiDB Hackthon project and therefore TiDB/ TiKV-related components will inevitably be chosen, TiKV as an intermediate storage solution for materialized view systems has a number of outstanding advantages in my opinion:
-
TiKV is a relatively mature distributed KV storage, and distributed environment is the next generation of materialized view system must support the scene. Using TiKV supporting Java Client, we can operate it conveniently. Meanwhile, TiDB itself, as an HTAP system, just provides a Playground for the demand of materialized view.
-
TiKV provides transaction support and MVCC based on the Percolator model, which is the basis for TiFlink’s implementation of strong consistent stream processing. As can be seen below, TiFlink writes to TiKV are mainly in the form of continuous transactions.
-
TiKV native provides support for CDC log output. In fact, the TiCDC component uses this feature to export CDC logs. In TiFlink, in order to achieve batch integration and simplify the system flow, we chose to call TiKV’s CDC GRPC interface directly, thus abandoning some of the features provided by TiCDC.
Our initial idea was to integrate computing directly into TiKV, but the choice of Flink was based on further reflection during the competition. The main advantages of choosing Flink are:
-
Flink is the most mature Stateful stream processing system on the market at present, which has strong expression ability for processing tasks and rich semantic support. Especially, it supports StreamSQL implementation of batch integration, so we can concentrate on exploring the functions that we pay attention to, such as strong consistency.
-
Flink is more complete than Watermark, and we found that its Exactly Once Delivery semantics based on Checkpoint can be easily combined with TiKV to realize transaction processing. In fact, some of Flink’s own sinks that support Two Phase Commit are committed in conjunction with Checkpoint.
-
Flink’s stream processing (StreamSQL in particular) itself is based on materialized view theory, and the DynamicTable interface is provided in newer versions to facilitate the introduction of external Change logs into the system. It already provides support for INSERT, DELETE, UPDATE, and many other CDC operations.
Of course, choosing a heterogeneous architecture like TiKV+Flink will also introduce some problems, such as SQL syntax mismatch, UDF cannot be shared and so on. In TiFlink, we use Flink SQL system and UDF as a plug-in system of TiKV, but it also provides convenient table building function. Implementation of strongly consistent materialized views
This section describes how TiFlink implements a strong consistency level on top of TiDB/TiKV: Stale Snapshot Isolation. At this isolation level, the inquirers always query a historically consistent snapshot state. In traditional snapshot isolation, the inquirers are required to observe only all transactions with Commit time less than T at time T. Delayed snapshot isolation only ensures that all committed transactions prior to T− δ T are observed. The simplest way to implement a strongly consistent materialized view on a transaction-enabled distributed database like TiDB is to update the view with one transaction after another. A transaction reads a consistent snapshot at the beginning, and updating the materialized view using a distributed transaction is itself a strongly consistent operation with ACID properties, so consistency is guaranteed.
Update materialized views with continuous transactions
To combine Flink with such a mechanism and achieve incremental maintenance, we take advantage of some features that TiKV itself already provides:
-
TiKV uses the Time Oracle to assign timestamps to all operations, so although it is a distributed system, the timestamps of transactions in the CDC logs it generates are actually ordered.
-
Nodes (regions) of TiKV can generate continuous incremental Change logs that contain various raw transaction information and contain timestamp information.
-
TiKV incremental logs periodically generate Resolved timestamps stating that the current Region can no longer generate messages with older timestamps. So it was good for Watermark.
-
TiKV provides distributed transactions that allow us to control the visibility of a batch of changes.
Therefore, the basic implementation idea of TiFlink is:
-
With the feature of stream batch integration, the source table can be read by snapshot with a global timestamp, and a consistent view of all source tables can be obtained.
-
Switch to incremental log consumption and use Flink’s DynamicTable interface to realize incremental maintenance and output of materialized view.
-
Commit changes at a pace so that all changes are written to the target table in an atomic transaction, providing materialized views with one update after another.
The key of the above points is to coordinate all nodes to complete distributed transactions together, so it is necessary to introduce the principle of TiKV’s distributed transaction execution.
Distributed transactions for TiKV
TiKV’s distributed transactions are based on the well-known Percolator model. The Percolator model itself requires the KV Store of the storage layer to have MVCC support and atomicity and optimistic locking (OCC) of single line read and write. On this basis, it takes the following steps to complete a transaction:
-
Specify a Primary Key and a start timestamp and write to the Primary Key.
-
Other lines are written at Prewrite as Secondary keys, which point to the primary Key and have the above start timestamp.
-
After all nodes have prewritten, the transaction can be committed by committing the primary key and giving a Commit timestamp.
-
After the primary key Commit, the transaction is actually committed, but for reading purposes, multiple nodes can concurrently Commit the secondary key and perform cleanup, after which all rows are visible.
The above distributed transaction is possible because the primary key Commit is atomic, and the success of secondary keys distributed across different nodes depends entirely on the primary key, so other readers will check if the primary key is committed when they read Prewrite rows that have not yet been committed. The reader also determines whether a row is visible based on the Commit timestamp. The Cleanup operation can also be handled by subsequent readers if it fails midway. To implement snapshot isolation, Percolator requires writers to check concurrent Prewrite records at write time to ensure that their timestamps meet certain requirements before committing a transaction. Essentially it requires that transactions with overlapping write sets cannot be committed at the same time. Our scenario assumes that the materialized view has only one writer and that the transaction is continuous, so there is no need to worry about this. Once you understand the distributed transaction principle of TiKV, you need to consider how to combine it with Flink. In TiFlink, we use the Checkpoint mechanism to implement globally consistent transaction commits.
Use Flink for distributed transaction commits
As can be seen from the above introduction, the distributed transaction commit of TiKV can be abstracted as 2PC once. Flink itself provides Sink to achieve 2PC, but it cannot be directly used in our scenario. The reason is that the Percolator model requires globally consistent transaction start and commit timestamps at commit time. And simply implementing 2PC on the Sink side is not enough to achieve a strong consistent isolation level: we also need to coordinate on the Source side so that each transaction reads exactly the required incremental log. Luckily, Flink’s 2PC commit mechanism is actually Checkpoint driven: When Sink receives a Checkpoint request, it does the necessary work to commit. Inspired by this, we can implement a pair of Source and Sink, and let them share Transaction information using the Checkpoint ID, and complete 2PC with the Checkpoint process. In order for different nodes to agree on transaction information (timestamp, primary key), a global coordinator needs to be introduced. The interfaces for transactions and global coordinators are defined as follows:
public interface Transaction { public enum Status { NEW, PREWRITE, COMMITTED, ABORTED; }; long getCheckpointId(); long getStartTs(); default long getCommitTs(); default byte[] getPrimaryKey(); default Status getStatus(); }public interface Coordinator extends AutoCloseable, Serializable { Transaction openTransaction(long checkpointId); Transaction prewriteTransaction(long checkpointId, long tableId); Transaction commitTransaction(long checkpointId); Transaction abortTransaction(long checkpointId); }Copy the code
Using the interface above, each Source and Sink node can use CheckpointID to start a transaction or obtain a transaction ID. The coordinator is responsible for assigning primary keys and maintaining the state of the transaction. For convenience, the Commit to the primary key at transaction Commit is also performed in the coordinator. There are many ways to implement the coordinator, but currently TiFlink uses the simplest implementation: start a GRPC service in the JobManager process. It is also possible to implement a distributed coordinator based on TiKV PD (ETCD) or TiKV itself.
Coordinated execution of transactions with Checkpoint
The figure above shows the coordination between performing distributed transactions in Flink and Checkpoint execution. The specific process of a transaction is as follows:
-
Source first receives incremental logs from TiKV, caches them with timestamps, and waits for the transaction to start.
-
When Checkpoint starts, the Source receives the signal first. The Checkpoint on the Source side and the log receiving service run on different threads.
-
The Checkpoint program obtains information about the current transaction (or starts a new transaction) through the global coordinator. In distributed cases, a CheckpointID transaction is started only once.
-
After getting the start time stamp of the transaction, the Source node starts emitting committed changes in the Cache that are less than this time stamp to the downstream compute nodes for consumption. The Source node also emits some Watermark.
-
After all Source nodes complete the above operations, Checkpoint is successfully completed at the Source node and will continue to be propagated. According to the mechanism of Flink, Checkpoint at each node ensures that all events prior to its arrival have been consumed.
-
When the Checkpoint reaches the Sink, all events previously transmitted to the Sink have been prewritten, and the transaction submission process can be started. Sink persists the transaction information in the internal state for recovery in case of errors. After all Sink nodes have completed this operation, the coordinator’s Commit method is called in the callback to Commit the transaction.
-
After the transaction is committed, Sink will start the thread to clean up the Secondary Key and start a new transaction.
Note that Sink may have started to receive written data before the first Checkpoint, but it has no transaction information at this point. To solve this problem, TiFlink directly starts an initial transaction at the start of the task, with CheckpointID 0 for committing the first few writes. In this case, at Checkpoint completion at CheckpointID=1, the 0 transaction is actually committed. Transactions and Checkpoint execution are coordinated in such a misplaced way. The diagram below shows the architecture of the entire TiFlink task, including the coordinator:
TiFlink’s system architecture
Based on the above system design, we have a materialized view of delayed snapshot isolation on TiKV.
Other Design considerations
It is well known that KSQL is another popular stream processing system besides Flink. It is directly integrated with the Kafka message queue system, and users do not need to deploy two processing systems, so it is favored by some users. Many users also use KSQL to implement requirements such as materialized views. However, in my opinion, this flow processing system with strong coupling to message queues is not suitable for materialized view usage scenarios. KSQL can be said to be the representative of log-oriented data processing system. In this system, the source of data lies in Log information, and all tables are views constructed by consuming Log information for convenient query. This system has the advantages of simple model, easy to implement, and can save log records for a long time. In contrast, Table Oriented data processing system, MySQL, TiDB/TiKV all belong to this kind of system. In this type of system, all changes are made to the table data structure, and while logs are generated, changes to the table data structure and logs are often coordinated. The main purpose of logging here is for persistence and transactions, and it tends not to last very long. Compared to log-oriented data processing systems, this kind of system is more complex to write and transaction processing, but it has more scalability requirements. Ultimately, this is because data in log-oriented systems is stored as logs, so scaling often requires costly rehashing and is more difficult to rebalance. In a table-oriented system, data is stored mainly in the form of tables, so it can be arranged in some columns in an orderly manner, thus facilitating the realization of Range segmentation, merging and rebalancing with the support of consistent Hash. Personally, in the materialized view scenario of batch streaming, it doesn’t make much sense to keep a log for a long time (since you can always recover data from a snapshot of the source table). Instead, it is important to expand data processing tasks and views as the business evolves. From this perspective, a Table Oriented system seems to be more suitable as a storage bearer for materialized view requirements. Of course, partition merging or splitting that occurs while consuming incremental logs in real time is a difficult problem to deal with. TiKV throws a GRPC error in this case. TiFlink currently uses a relatively simple static mapping method to deal with the relationship between tasks and partitions, and a more reasonable solution can be considered in the future.
conclusion
This paper introduces the basic principle of implementing strong consistent materialized view on TiKV using Flink. The above principles have been basically implemented in TiFlink system, welcome readers to try. All of the above discussion is based on the guarantee of Flink’s ultimate consistency model that the result of flow calculation is only related to consumed events and their order in their own stream, not the order in which they arrive in the system or the relative order between different streams.
The current TiFlink system still has many points worth improving, such as:
- Non-integer and combined primary keys are supported
- Better mapping of TiKV Region to Flink mission
- Better Fault Tolerance and TiKV transaction cleanup in case of task interruption
- Good unit testing
If you are interested in TiFlink, please try it out and give feedback, and it would be great if you could contribute code to help improve the system.
Thinking about the consistency of materialized view system is one of my main achievements this year. In fact, we didn’t pay attention to this aspect at first, but only realized it was a valuable and challenging problem through constant communication. Through the implementation of TiFlink, it can be said that the feasibility of the above method to achieve delayed snapshot consistency is basically verified. Of course, due to the limited level of personal ability, if there is any mistake, you are welcome to discuss.
Finally, if we assume that the above arguments for delayed snapshot consistency are correct, a way to implement true snapshot isolation is in order. Can you think of that?