This article is written by Ding Yang from the R&D Center of Agricultural Bank of China. Flink CDC 2.1 was downloaded and used immediately after release, and it successfully achieved real-time data capture and performance tuning of Oracle. Now I will share some key details in the trial process. The main contents include:
- Unable to connect to database
- Unable to find Oracle table
- Large data latency
- Tuning parameters continues to reduce data latency
- Hidden parameters for Debezium Oracle Connector
The latest version 2.1, released by Flink CDC on November 15, 2021, adds support for Oracle by introducing built-in Debezium components. The author downloaded this version for trial at the first time and successfully realized real-time data capture and performance tuning of Oracle. Now I will share some key details in the trial process.
Note: This article strives to share some “things” according to the actual problem troubleshooting experience, as well as the internal implementation principle, so the basic use method of Flink CDC, and its built-in Debezium module is not involved, for the basic use method, parameters, etc., readers can refer to the following address:
Flink CDC:
Ververica. Making. IO/flink – CDC – c…
Debezium:
Debezium. IO/documentati…
Trial Environment:
Oracle: 11.2.0.4.0 (RAC Deployment)
Flink: 1.13.1
Hadoop: 3.2.1
Deploy in Flink on Yarn mode
The database cannot be connected
According to the official documentation, enter the following statement in the Flink SQL CLI:
Create table TEST (A string) WITH ('connector'='oracle-cdc', 'hostname'='10.230.179.125', 'port'='1521', 'username'='myname', 'password'='***', 'database-name'='MY_SERVICE_NAME', 'schema-name'='MY_SCHEMA', 'table-name'='TEST' );Copy the code
The Oracle database cannot be connected to by running the select * from TEST command.
[ERROR] Could not execute SQL statement. Reason:
oracle.net.ns.NetException: Listener refused the connection with the following error:
ORA-12505, TNS:listener does not currently know of SID given in connect descriptor
Copy the code
Flink CDC may have misidentified the MY_SERVICE_NAME (Oracle service name) provided in the connection information as SID. So try to read Flink CDC involves the Oracle Connector source, found in com. Ververica. CDC. Connectors. Oracle. OracleValidator, for Oracle connection code is as follows:
public static Connection openConnection(Properties properties) throws SQLException {
DriverManager.registerDriver(new oracle.jdbc.OracleDriver());
String hostname = properties.getProperty("database.hostname");
String port = properties.getProperty("database.port");
String dbname = properties.getProperty("database.dbname");
String userName = properties.getProperty("database.user");
String userpwd = properties.getProperty("database.password");
return DriverManager.getConnection(
"jdbc:oracle:thin:@" + hostname + ":" + port + ":" + dbname, userName, userpwd);
}
Copy the code
In the current version of Flink CDC, there is no distinction between SID and Service Name connections. Instead, SID connections are written in the code (port and dbname are separated by “:”).
Since Oracle 8I, Oracle has introduced the concept of Service Name to support cluster (RAC) deployment of databases. A Service Name can serve as a logical concept for a database, unifying connections to different SID instances of that database. Accordingly, the following two approaches can be considered:
-
In the Flink CDC create table statement, replace database-name with Service name with one of the SIDs. This method can solve the connection problem, but it cannot adapt to the real scenario of mainstream Oracle cluster deployment.
-
Modify the source code. Concrete can be, in the process of construction engineering. Rewrite the com ververica. CDC. Connectors. Oracle. OracleValidator method, Change the connection mode to Service Name (port and dbname are separated by /), that is:
“jdbc:oracle:thin:@” + hostname + “:” + port + “/” + dbname, userName, userpwd);
The author adopts the second method, which achieves the normal connection to the database while retaining the use of Oracle Service Name feature.
The Issue has been submitted to Flink CDC Issue 701:
Github.com/ververica/f…
Unable to find Oracle table
If you run the select * from TEST command again, the following error message is displayed:
[ERROR] Could not execute SQL statement. Reason:
io.debezium.DebeziumException: Supplemental logging not configured for table MY_SERVICE_NAME.MY_SCHEMA.test. Use command: ALTER TABLE MY_SCHEMA.test ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS
Copy the code
The table mentioned in the error log is my_service_name.my_schema. test. Why is the database name and Schema name uppercase but the table name lowercase?
Note that the error was reported by the IO. Debezium package, and by analyzing the source code of the package (the current version of Debezium is 1.5.4, according to the POM.xml file of the Flink CDC), . In IO. Debezium. Relational Tables in the following code:
private TableId toLowerCaseIfNeeded(TableId tableId) {
return tableIdCaseInsensitive ? tableId.toLowercase() : tableId;
}
Copy the code
As you can see, the developers of Debezium defined “case insensitive” as “need to convert table names to lowercase.” This is true for Debezium supported PostgreSQL, Mysql, etc. In the case of Oracle databases, however, case-insensitive means that the table name needs to be capitalized for internal meta information storage
See docs.oracle.com/cd/E11882_0…
“Nonquoted identifiers are not case sensitive. Oracle Interprets them as uppercase”).
So Debezium reads a “case-insensitive” configuration and, following the above code logic, only gets an error when trying to read a lower-case table name.
Since Debezium has not fixed the problem until the latest stable release 1.7.1 and the latest development release 1.8.0, we can bypass the problem in the following two ways:
-
If you want to use Oracle’s case-insensitive feature, you can directly modify the source code and change the toLowercase to toUppercase (this is the method I chose).
-
If they are not willing to modify the source code, without using Oracle “case-insensitive” features, can add in the create statement ‘debezium. Database. The tablename. Case. Insensitive’ = ‘false’, the following sample:
Create table TEST (A string) WITH ('connector'='oracle-cdc', 'hostname'='10.230.179.125', 'port'='1521', 'username'='myname', 'password'='***', 'database-name'='MY_SERVICE_NAME', 'schema-name'='MY_SCHEMA', 'table-name'='TEST', 'debezium.database.tablename.case.insensitive'='false' );Copy the code
The downside of this method is that it loses the case-insensitive nature of Oracle and must explicitly specify the uppercase table name in ‘table-name’.
Need to indicate, for the database. The tablename.. Case insensitive parameters, Debezium currently only the Oracle 11 g default set to true, for the rest of the Oracle versions are set to false by default. Therefore, if you are not using Oracle 11g, you do not need to change this parameter, but you still need to explicitly specify uppercase table names.
The Issue has been submitted to Flink CDC Issue 702:
Github.com/ververica/f…
3. Large data delay
Data latency is large, sometimes taking 3-5 minutes to capture data changes. A clear solution to this problem is provided in the Flink CDC FAQ: Add the following two configuration items to the CREATE statement:
'debezium.log.mining.strategy'='online_catalog',
'debezium.log.mining.continuous.mine'='true'
Copy the code
So why do it? We can still improve our understanding of the tool by analyzing the source code and logs, combined with how Oracle Logminer works.
Extraction of Logminer work, mainly in Debezium IO. Debezium. The oracle. Logminer. LogMinerStreamingChangeEventSource the execute method. To save space, this article does not list the actual source code, only extract the key process drawn in the following flow chart, interested readers can compare the flow chart, combined with the actual source code for analysis:
By using reDO_log_CATALOG, DDL information of the data table can be monitored, and because archive logs are permanently saved to disk, all DDL and DML operations before the database outage can still be obtained. But because it involves more information monitoring than the online Catalog, and the resulting frequent log switching and log dump operations, the costs can be staggering.
The situation of the test according to the author, if debezium. The mining, the strategy for the default configuration redo_log_catalog,, not only need more executed step (1) operation (the operation takes about 30 seconds to 1 minute), in the first step (4), Depending on the amount of data archived logs take, it can also vary between 1 and 4 minutes; At step 5, the actual query of the V$LOGMNR_CONTENTS view often takes more than a dozen seconds to complete.
In addition, because archive logs grow rapidly in real systems, they are often accompanied by periodic deletion or dumping of expired logs. Because the above step ④ takes a long time, the author observed that a rchive logs added in step ② will expire and be deleted in a certain probability during the execution of step ④. Therefore, when querying in step ⑤, the following error will be reported because the logs added in step ② cannot be found:
ORA-00308: cannot open archive log '/path/to/archive/log/... ' ORA-27037: unable to obtain file statusCopy the code
Generally speaking, the tables that Flink CDC needs to monitor, especially those that are of great significance to business systems, will not perform DDL operations and only need to capture DML operations. In extremely special cases, such as database outage, full data update can also be used to ensure data consistency after database recovery. Thus, the online_catalog approach is sufficient for our needs.
In addition, whether using online_CATALOG or the default redo_log_catalog, there will be a problem of the log found in step ② and the actual need for step ⑤ are not synchronized. Therefore, Join ‘debezium. Log. Mining,. Continuous. Mime’ = ‘true’ parameters, the work of real-time gathering log to Oracle done automatically, can avoid this problem.
After the author configures these two parameters, the data delay can be reduced from a few minutes to about 5 seconds.
4. Adjust parameters to further reduce data delay
Step 3 and step 7 of the above flowchart mentioned how to determine the range of LogMiner monitoring sequence and determine the sleep time according to configuration items. The process is further analyzed below and a general methodology is given for further tuning individual tables.
By observing the IO. Debezium. Connector. Oracle. Logminer. LogMinerHelper getEndScn method in class, can learn debezium temporal range for the monitoring and regulating principle of sleep time. To facilitate readers’ understanding, the method is illustrated as follows with a flow chart:
As can be seen from the above flow chart, debezium gives log.mining.batch.size.* and log.mining.sleep.time.*. The goal is to keep the step size of each logMiner run as consistent as possible with the incremental step size of the DATABASE’s own SCN. Thus it can be seen that:
-
Log.mining.batch.size.* and log.mining.sleep.time.* parameter Settings, and the overall performance of the database, and a single table data changes are irrelevant;
-
Log. Mining,. Batch. The size. The default is not only monitoring temporal range of initial value, or monitor the threshold value of temporal range. Therefore, if you want to achieve more flexible adjustment of monitoring timing range, you can consider appropriately reducing this parameter;
-
Because every time determine the scope of the monitoring sequence, will adjust according to the size of the topScn and currentScn sleepTime, so in order to realize the sleep time more flexible adjustment, can consider to increase the appropriate mining,. Sleep. Time. Increment. Ms.
-
Log. Mining,. Batch. Size. Max cannot too small, otherwise there will be monitoring temporal range can never catch up with the current SCN risk database. Therefore, debezium in IO. Debezium. The oracle. OracleStreamingChangeEventSourceMetrics exists in the following logic:
if (currentBatchSize == batchSizeMax) { LOGGER.info("LogMiner is now using the maximum batch size {}. This could be indicative of large SCN gaps", currentBatchSize); } Copy the code
If the current monitoring temporal range up to the log. The mining,.. Batch size. Max, so debezium will be given in the log as reminder. In practice, observe whether Flink CDC of log is containing the tips, can be learned that the mining, the batch. The size. The Max value is reasonable.
5. Hidden parameters of Debezium Oracle Connector
There are actually two hidden parameters that we already know from the above: Debezium. Database. Tablename, case insensitive (see the next section) and debezium. The mining, the continuous, mime (see section 3), Neither of these parameters is actually stated in the official Documentation of Debezium, but they can actually be used. Through the analysis of the source code, all the hidden parameters of Debezium Oracle Connector are given and their descriptions are as follows:
The author thinks that in addition to the above two parameters, we have used to focus on is the same. Mining, history. Recorder. The class parameter. Due to the parameter is the default for IO. Debezium. The oracle. Logminer. NeverHistoryRecorder, is an empty class, so we on the analysis of the Flink CDC behavior, IO was achieved by the custom. Debezium. The oracle. Logminer. HistoryRecorder interface classes, but in the case of don’t modify the source code, realize the Flink CDC behavior of personalized monitoring.
Flink-cdc project address: github.com/ververica/f…
** Recent hot topics **
Flink Forward Asia 2021 postponed, online meet