The author is Xu Bangjiang (Xuejin)
Interacting video below her evil (cloud) share Flink CDC incarnations: www.bilibili.com/video/BV1jT…
preface
CDC (Change Data Capture) is a technology used to Capture database Change Data. Flink has been supporting CDC Data processing since version 1.11. At present, it is a very mature Change Data processing scheme.
Flink CDC Connectors are a group of Source Connectors, the core component of Flink CDC These connectors are responsible for reading stock history and incremental change data from databases such as MySQL, PostgreSQL, Oracle, and MongoDB. Flink CDC Connectors is an independent open source project. Since its open source in July last year, the community has maintained a fairly rapid development, with an average of one version every two months. The attention of the open source community continues to rise, and gradually more and more users are using Flink CDC to quickly build real-time data silo and data lake.
Flink CDC Maintainer Xu Maintainer shared the Flink CDC 2.0 design for the first time in July at the Flink Meetup in Beijing. Then, in August, the Flink CDC community released version 2.0, which addressed a number of production practice pain points and quickly expanded the Flink CDC community’s user base.
In addition to the rapid expansion of the community user group, the community is also rapidly increasing the number of developers. At present, many domestic and foreign developers have joined the open source construction of Flink CDC community, including developers from Cloudera in North America, and from Vinted and Ververica in Europe. Domestic developers are more active, from Internet companies such as Tencent, Alibaba and Byte, as well as startups and traditional enterprises such as XTransfer and New China Wenxuan. In addition, many cloud vendors at home and abroad have integrated Flink CDC into their stream computing products, enabling more users to experience the power and convenience of Flink CDC.
Flink CDC 2.1 Overview
Today, the Flink CDC community is pleased to announce the official release of Flink CDC 2.1: github.com/ververica/f…
This article takes you through the major improvements and core features of Flink CDC version 2.1 in 10 minutes. Version 2.1 includes 100+ PR from 23 contributors and focuses on improving the performance and production stability of the MySQL CDC connector, including the release of the Oracle CDC connector and MongoDB CDC connector.
-
MySQL CDC supports large tables with tens of billions of data, supports all MySQL data types, and improves stability through connection pool reuse and other optimizations. DataStream ALSO provides an API that supports concurrent data reads with no lock algorithm. You can use this API to set up a synchronization link for the entire database.
-
Added Oracle CDC connector to support fetching full history and incremental change data from Oracle database;
-
MongoDB CDC connector is added to support obtaining full history data and incremental change data from MongoDB database.
-
All connectors support the metadata column function. Users can access meta information such as database name, table name, and data change time using SQL, which is very useful for data integration in database and table scenarios.
-
Enrich Flink CDC starter documentation and add end-to-end hands-on tutorials for a variety of scenarios.
MySQL CDC connector
In Flink CDC 2.0, the MySQL CDC connector provides advanced features such as lock-free algorithms, concurrent reads, and resumable breakpoints, all of which solve many of the pain points in production practice. In the process of launching, we cooperated with users to solve many production problems, and also developed some high quality functions urgently needed by users. The improvement of Flink CDC 2.1 for MySQL CDC connector mainly includes two categories, one is stability improvement, the other is function enhancement.
1. Improved stability
-
For different primary key distributions, dynamic sharding algorithm is introduced
For scenarios such as non-numerical primary key, Snowflake ID, sparse primary key, and joint primary key, we dynamically analyze the uniformity of primary key distribution in the source table, and automatically calculate the fragment size according to the uniformity of distribution, so as to make the slice more reasonable and make the fragment calculation faster. Dynamic sharding algorithm can solve problems such as too many fragments in the sparse primary key scenario and too large fragments in the joint primary key scenario. The number of rows contained in each fragment should be kept within the chunk size specified by the user. In this way, users can control the fragment size and number by chunk size without caring about the primary key type.
-
Support ten billion large scale table
If the size of a large table is very large, a binlog fragment will fail to be sent. This is because the number of snapshot fragments in a large table is very large, and the binlog fragment must contain all snapshot fragment information. When the SourceCoordinator sends a Binglog fragment to the SourceReader node, the fragment size exceeds the maximum size supported by the RPC communication framework, causing the fragment delivery failure. Although the problem of large fragment size can be alleviated by modifying the parameters of RPC framework, it cannot be completely solved. In version 2.1, this problem was solved by dividing multiple Snapshot fragments into groups and sending each binlog fragment one by one.
-
Introduction of connection pools to manage database connections improves stability
By introducing connection pool to manage database connections, the number of database connections is reduced and connection leakage caused by extreme scenarios is avoided.
-
When the schemas of different databases and tables are inconsistent, NULL values are automatically filled in the missing fields
2. Enhanced functions
-
All MySQL data types are supported
This includes complex types such as enumeration types, array types, and geographic information types.
-
Support the metadata column
Flink DDL allows users to access database name (database_name), table name (table_name), change time (op_TS), and so on by db_name STRING METADATA FROM ‘database_name’ Meta information. This is useful for data integration in separate database and table scenarios.
-
DataStream API that supports concurrent reads
In version 2.0, lock-free algorithms, concurrent reads, and other features are only available to users on the SQL API, while DataStream API is not available to users. In version 2.1, DataStream API is supported to create data sources through MySqlSourceBuilder. Users can capture multiple table data at the same time to set up the synchronization link of the whole library. Schema changes can also be captured using MySqlSourceBuilder#includeSchemaChanges.
-
CurrentFetchEventTimeLag, currentEmitEventTimeLag, and sourceIdleTime monitor indicators
These indicators follow the Connector specifications of the FLIP 33 [1], and you can view the FLIP 33 for the meaning of each indicator. The currentEmitEventTimeLag indicator records the difference between the point in time when the Source sends a record to the downstream node and the point in time when the record is generated in DB, which is used to measure the delay of data generation from DB to leaving the Source node. The user can use this indicator to determine whether the source has entered the binlog reading phase:
-
That is, when this indicator is 0, it indicates that the full history reading is still in progress.
-
When the value is greater than 0, the binlog reading phase is entered.
-
Add Oracle CDC connector
Oracle is also a widely used database, and the Oracle CDC connector supports capturing and logging row-level changes that occur in the Oracle database server. This works by using the LogMiner [2] tool provided by Oracle or the native XStream API [3] to retrieve change data from Oracle.
LogMiner is an analysis tool provided by Oracle databases that parses Oracle Redo log files to parse the database’s data change logs into change event outputs. With LogMiner, The Oracle server places strict resource constraints on the process that parses log files, so data parsing is slow for very large tables. The advantage is that LogMiner is free to use.
The XStream API is an internal interface provided by Oracle database to Oracle GoldenGate (OGG). The XStream API allows clients to efficiently retrieve change events from Redo log files. Instead, Oracle GoldenGate (OGG) directly reads data from a block of memory in Oracle server, saving the overhead of dropping data to log files and parsing log files. However, you must purchase an Oracle GoldenGate (OGG) License.
The Oracle CDC connector supports both LogMiner and XStream APIS for capturing change events. In theory, it supports all Oracle versions. Currently, Flink CDC has tested Oracle versions 11,12 and 19. Using the Oracle CDC connector, users can capture changes in the Oracle database in real time by declaring the following Flink SQL:
Using Flink’s rich surrounding ecosystem, users can easily write to various downstream stores, such as message queues, data warehouses, data lakes, etc.
The Oracle CDC connector has masked the underlying CDC details, and the entire real-time synchronization link allows users to capture and send changes to Oracle data in real time with just a few lines of Flink SQL without developing any Java code.
In addition, the Oracle CDC connector provides two working modes: read full data + incremental change data, and read incremental change data only. The Flink CDC frameworks all guarantee exactly-once semantics.
Add MongoDB CDC connector
The MongoDB CDC connector is independent of Debezium and was developed independently in the Flink CDC project. The MongoDB CDC connector supports capturing and recording the real-time change data in the MongoDB database. Its principle is to disguise a copy in the MongoDB cluster [4] and utilize the high availability mechanism of the MongoDB cluster. This copy can obtain the complete Oplog (Operation log) event stream from the master node. The Change Streams API provides the ability to subscribe to these Streams of Oplog events in real time and push them to subscribing applications.
In the update events obtained from the ChangeStreams API, there is no pre-mirror value for the update event, i.e. the MongoDB CDC data source can only be used as a upsert source. However, the Flink framework will automatically attach a Changelog Normalize node to the MongoDB CDC to complete the pre-mirror values of the update event (i.e., the UPDATE_BEFORE event) to ensure the semantic correctness of the CDC data.
Using the MongoDB CDC connector, users can capture full and incremental change data in the MongoDB database in real time by simply declaring the following Flink SQL. With Flink’s powerful integration capabilities, Users can easily synchronize data in MongoDB to all downstream storage supported by Flink in real time.
In the whole data capture process, users do not need to learn the replica mechanism and principle of MongoDB, which greatly simplifies the process and reduces the threshold of use. MongoDB CDC also supports two startup modes:
-
The default initial mode is to synchronize the amount of data in the table first and then the increments in the table.
-
In latest-offset mode, only incremental data in the table is synchronized from the current point in time.
In addition, the MongoDB CDC provides a wealth of configuration and optimization parameters that can greatly improve the performance and stability of real-time links for production environments.
V. Summary and outlook
In just over a year, the Flink CDC project has achieved phenomenal growth and attention, thanks to the selfless contributions of Flink CDC open source community contributors and the positive feedback of Flink CDC users. It is this positive interaction between the two that makes the Flink CDC project healthy, and this positive interaction is the charm of the open source community.
Flink CDC community will continue to build open source community. In the future planning, there are three main directions:
-
Do in-depth CDC technology
For example, abstract reuse of mysql-CDC implementation, so that Oracle, MongoDB can also quickly support lockless reading, concurrent reading and other features.
-
Do extensive database ecology
We will support richer database CDC data sources, such as TiDB, DB2, MS SqlServer, etc.
-
Make a good data integration scenario
-
Better integrate the downstream ecology of real-time data warehouse and data lake, including Hudi, Iceberg, ClickHouse, Doris, etc.
-
Further reduce the threshold of CDC data into lake and warehouse, and solve the pain points such as synchronization of whole database and synchronization of table structure change.
-
Thank you
Special thanks to Marton Balassi from Cloudera, Tamas Kiss for the Oracle CDC connector and Jiabao Sun from XTransfer for the MongoDB CDC connector.
List of contributors:
Gunnar Morling, Jark Wu, Jiabao Sun, Leonard Xu, MartijnVisser, Marton Balassi, Shengkai, Tamas Kiss, Tamas Zseder, Zongwen Li, dongdongking008, frey66, gongzhongqiang, ili zh, jpatel, lsy, luoyuxia, manmao, mincwang, taox, tuple, wangminchao, yangbin09
[1] cwiki.apache.org/confluence/…
[2] oracle-base.com/articles/8i…
[3] docs.oracle.com/cd/E11882_0…
[4] docs.mongodb.com/manual/repl…
Flink Forward Asia 2021 kicks off with a bang, with 40+ first-tier manufacturers in more than 40 industries and 80+ dry goods issues, bringing exclusive technology feast for developers. flink-forward.org.cn/
Otherwise the firstFlink Forward Asia HackathonOfficially launched, 10W bonus waiting for you!www.aliyun.com/page-source…
For more technical problems related to Flink, you can scan the code to join the community nail nail exchange group for the first time to obtain the latest technical articles and community dynamics, please pay attention to the public number ~