- preface
- Flink consumes CDC data
- flink cdc connector
- changelog format
preface
CDC, short for Change Data Capture, uses CDC to Capture committed changes from the database and send those changes downstream for downstream use. These changes can include INSERT,DELETE,UPDATE, and so on.
Users can use CDC in the following scenarios:
- Real-time data synchronization: For example, we synchronize data from mysql library to our database.
- Real-time materialized view of the database.
Flink consumes CDC data
In the previous data synchronization, if we wanted to obtain the data of the database in real time, the general architecture was to use third-party tools such as Canal and Debezium to collect the change log of the database in real time, and then send the data to message queues such as Kafka. Kafka data is then consumed by other components, such as Flink, Spark, etc., calculated and sent downstream. The overall architecture is as follows:
For the above architecture, Flink plays the role of computing layer. At present, the format provided by Flink has two formats: Canal-json and Debezium-JSON. Let’s give a brief introduction below.
canal format
In China, canal, an open source service of Alibaba, is widely used. We can use Canal to subscribe to mysql binlog. Canal will organize the change data of mysql library into its fixed JSON or Protobuf format and send it to Kafka for downstream use.
The json data format parsed by Canal is as follows:
{" data ": [{" id" : "111", "name" : "scooter", "description" : "Big 2 - wheel scooter", "weight" : "5.18"}], "database" : "inventory", "es": 1589373560000, "id": 9, "isDdl": false, "mysqlType": { "id": "INTEGER", "name": "VARCHAR (255)", "description" : "VARCHAR (512)", "weight" : "FLOAT"}, "old" : [{" weight ":" 5.15 "}], "pkNames" : [ "id" ], "sql": "", "sqlType": { "id": 4, "name": 12, "description": 12, "weight": 7 }, "table": "products", "ts": 1589373560798, "type": "UPDATE" }Copy the code
A few key fields:
- Type: Describes the operation type, including ‘UPDATE ‘, ‘INSERT’, and ‘DELETE’.
- Data: indicates the data of an operation. If ‘INSERT’, it represents the contents of the row; ‘UPDATE’ indicates the updated status of the row. If the value is ‘DELETE’, it indicates the status before the deletion.
- Old: Optional field. If it exists, it indicates the content before the update. If it is not an update operation, it is null.
The full semantics are as follows;
private String destination; // Corresponding to canal instance or MQ topic private String groupId; // Group ID of mq private String database; // Database or schema private String table; Private List<String> pkNames; private Boolean isDdl; private String type; // Type: INSERT UPDATE DELETE // binlog executeTime private Long es; DML build timeStamp private Long ts; // Synchronization time private String SQL; Private List<Map<String, Object>> data; Private List<Map<String, Object>> old; // Old data list, used for update, size and data size one-to-one correspondenceCopy the code
In Flink SQL, the SQL that consumes this data is as follows:
CREATE TABLE topic_products (
id BIGINT,
name STRING,
description STRING,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = 'products_binlog',
'properties.bootstrap.servers' = 'localhost:9092',
'properties.group.id' = 'testGroup',
'format' = 'canal-json' -- using canal-json as the format
)
Copy the code
SQL > select topic_products (topic_products, topic_products, topic_products, topic_products, topic_products);
debezium format
In foreign countries, debezium, a well-known open source tool similar to Canal, is more powerful than Canal and not only supports mysql. It also supports synchronization with other databases, such as PostgreSQL, Oracle, etc. Debezium currently supports serialization formats such as JSON and Apache Avro.
Debezium provides the following format:
{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"source": {...},
"op": "u",
"ts_ms": 1589362330904,
"transaction": null
}
Copy the code
Similarly, when using Flink SQL to consume, SQL is similar to canal above, just change foramt to debezium-json.
CanalJson deserialization source code parsing
Let’s take a look at the implementation of canal-JSON format in Flink’s source code. As a flink format, canal format is also source, so it involves deserialization when reading data. Let’s briefly look at the realization of deserialization of CanalJson. Specific implementation class is CanalJsonDeserializationSchema.
Let’s look at the core deserialization method:
@Override public void deserialize(byte[] message, Collector<RowData> out) throws IOException {try {// Deserialize message to RowData RowData row = jsonDeserializer.deserialize(message); String type = row.getString(2).toString(); If (op_insert.equals (type)) {// If the operation type is INSERT, then the data array represents the data to be inserted, loop over the data, then add an identifier insert, construct the RowData object, and send downstream. ArrayData data = row.getArray(0); for (int i = 0; i < data.size(); i++) { RowData insert = data.getRow(i, fieldCount); insert.setRowKind(RowKind.INSERT); out.collect(insert); }} else if (op_update.equals (type)) {ArrayData data = row.getarray (0); ArrayData old = row.getarray (1); // ArrayData old = row.getarray (1); for (int i = 0; i < data.size(); i++) { // the underlying JSON deserialization schema always produce GenericRowData. GenericRowData after = (GenericRowData) data.getRow(i, fieldCount); GenericRowData before = (GenericRowData) old.getRow(i, fieldCount); for (int f = 0; f < fieldCount; F ++) {if (before. IsNullAt (f)) {if (before. IsNullAt (f)) {if (before. IsNullAt (f)) {if (before. That is, the before and after data sent downstream are the same. before.setField(f, after.getField(f)); } } before.setRowKind(RowKind.UPDATE_BEFORE); after.setRowKind(RowKind.UPDATE_AFTER); // Send the data before and after the update downstream out.collect(before); out.collect(after); }} else if (op_delete.equals (type)) {// ArrayData = row.getarray (0); // ArrayData = row.getarray (0); for (int i = 0; i < data.size(); i++) { RowData insert = data.getRow(i, fieldCount); insert.setRowKind(RowKind.DELETE); out.collect(insert); } } else { if (! ignoreParseErrors) { throw new IOException(format( "Unknown \"type\" value \"%s\". The Canal JSON message is '%s'", type, new String(message))); } } } catch (Throwable t) { // a big try catch to protect the processing. if (! ignoreParseErrors) { throw new IOException(format( "Corrupt Canal JSON message '%s'.", new String(message)), t); }}}Copy the code
flink cdc connector
background
For the above architecture, we need to deploy Canal (Debezium) + Kafka, and then Flink consumes data from Kafka. In this architecture, we need to deploy multiple components, and the data also needs to be dropped to Kafka. Is there a better way to simplify the process? Let’s move on to the CDC Connector provided by Flink.
This connector is not included in the code of Flink. The specific address is github.com/ververica/f…
In this architecture, Flink directly consumes the incremental logs of the database, replacing Canal (Debezium), which used to be the data acquisition layer, and then calculates directly. After the calculation, flink sends the calculation results to the downstream. The overall structure is as follows:
The benefits of using this architecture include:
- Reduced maintenance costs for Canal and Kafka, shorter links and lower latency
- Flink provides exactly once semantics
- Can be read from the specified position
- Kafka was removed, reducing the cost of storing messages
mysql-cdc
Flink currently supports two built-in connectors, PostgreSQL and mysql. Let’s take mysql as an example.
Mysql > create poM mysql > create POM mysql > create POM mysql > create POM
<dependency> <groupId>com.alibaba.ververica</groupId> <! -- add the dependency matching your database --> <artifactId>flink-connector-mysql-cdc</artifactId> The < version > 1.1.0 < / version > < / dependency >Copy the code
For SQL clients, download flink-sql-connector-mysql-cdc-1.1.0.jar and place it under
/lib/
Example SQL for connecting to a mysql database is as follows:
CREATE TABLE mysql_binlog ( id INT NOT NULL, name STRING, description STRING, Weight DECIMAL(10,3)) WITH ('connector' = 'mysql-cdc', 'hostname' = 'localhost', 'port' = '3306', 'username' = 'flinkuser', 'password' = 'flinkpw', 'database-name' = 'inventory', 'table-name' = 'products' )Copy the code
If you subscribe to a Postgres database, you need to replace connector with Postgres-CDC. The schema of the DDL table corresponds to the database one by one.
For more detailed configuration, see:
Github.com/ververica/f…
Mysql – CDC Connector source code analysis
Let’s take a look at how the source hierarchy is implemented using mysql-cdc as an example. Since it is an SQL connector, it will first have a corresponding TableFactory, then construct the corresponding source in the factory class, and finally convert the consumed data into the RowData format known by Flink and send it to the downstream.
Let’s look at the implementation of the Flink CDC source code along these lines.
In the flink-connector-mysql-cdc module, find the corresponding factory class: MySQLTableSourceFactory, go to createDynamicTableSource(Context Context) method, and in this method, Construct a MySQLTableSource class using host, dbname, and other information obtained from DDL attributes.
MySQLTableSource
In MySQLTableSource# getScanRuntimeProvider method, we see that the first constructs a RowDataDebeziumDeserializeSchema for serializing the object, This object is primarily used to convert SourceRecord data obtained by Debezium into RowData objects that Flink recognizes. We see RowDataDebeziumDeserializeSchem# deserialize method, the main operation is to come in under a data type (insert, update, delete), Then convert them separately for different types (short, int, etc.),
Finally, we see that the Source function used by Flink to get the database change log is DebeziumSourceFunction and that the final return type is RowData.
That is to say, the bottom layer of Flink is to use Debezium tools to obtain change data from mysql, Postgres and other databases.
@SuppressWarnings("unchecked")
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext scanContext) {
RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
TypeInformation<RowData> typeInfo = (TypeInformation<RowData>) scanContext.createTypeInformation(physicalSchema.toRowDataType());
DebeziumDeserializationSchema<RowData> deserializer = new RowDataDebeziumDeserializeSchema(
rowType,
typeInfo,
((rowData, rowKind) -> {}),
serverTimeZone);
MySQLSource.Builder<RowData> builder = MySQLSource.<RowData>builder()
.hostname(hostname)
..........
DebeziumSourceFunction<RowData> sourceFunction = builder.build();
return SourceFunctionProvider.of(sourceFunction, false);
}
Copy the code
DebeziumSourceFunction
Let’s look at the DebeziumSourceFunction class next
@PublicEvolving
public class DebeziumSourceFunction<T> extends RichSourceFunction<T> implements
CheckpointedFunction,
ResultTypeQueryable<T> {
.............
}
Copy the code
The DebeziumSourceFunction class inherits RichSourceFunction and implements the CheckpointedFunction interface, i.e., this class is a Flink SourceFunction. The data is fetched from the source (the run method) and sent downstream. In addition, this class implements the CheckpointedFunction interface, which uses the checkpoint mechanism to ensure exactly once semantics.
Next, let’s go to the run method and see how to get the changed data from the database.
@Override public void run(SourceContext<T> sourceContext) throws Exception { ........................... // DO NOT include schema change, e.g. DDL properties.setProperty("include.schema.changes", "false"); . // Print all the attributes for troubleshooting. // dump the properties String propsString = properties.entrySet().stream() .map(t -> "\t" + t.getKey().toString() + " = " + t.getValue().toString() + "\n") .collect(Collectors.joining()); LOG.info("Debezium Properties:\n{}", propsString); This. debeziumConsumer = new DebeziumChangeConsumer<>(sourceContext, deserializer, restoredOffsetState == null, // DB snapshot phase if restore state is null this::reportError); // create the engine with this configuration ... this.engine = DebeziumEngine.create(Connect.class) .using(properties) .notifying(debeziumConsumer) // Using ((success, message, error) -> {if (! success && error ! = null) { this.reportError(error); } }) .build(); if (! running) { return; } // run the engine asynchronously executor.execute(engine); // Interrupt engine when the program is broken or there is an error. And throw an exception / / on a clean exit and wait for the runner thread try {while (running) {if (executor. AwaitTermination (5, TimeUnit.SECONDS)) { break; } if (error ! = null) { running = false; shutdownEngine(); // rethrow the error from Debezium consumer ExceptionUtils.rethrow(error); } } } catch (InterruptedException e) { // may be the result of a wake-up interruption after an exception. // we ignore this here and only restore the interruption state Thread.currentThread().interrupt(); }}Copy the code
At the beginning of the function, many properties are set, such as include.schema.changes are set to false, that is, DDL operations on the table are not included, and changes to the table structure are not captured. We are only concerned with data additions and deletions.
Then constructed a DebeziumChangeConsumer objects, the class implements the DebeziumEngine. ChangeConsumer interface, main is to get a group of data processing lines.
The DebeziumEngine object is the one that does the real work. The underlying object uses Kafka’s connect-API to fetch data. Get an org. Apache. Kafka. Connect. Source. SourceRecord object. The notifying method gives the resulting data to the DebeziumChangeConsumer defined above to override the default implementation for complex operations.
The engine is then asynchronously started through a thread pool ExecutorService.
Finally, a loop is made to interrupt Engine and throw exceptions when the program is broken or has errors.
To sum up, it is in the Source function of Flink that the Debezium engine is used to obtain the corresponding database change data (SourceRecord). After a series of deserialization operations, it is finally converted into RowData object in Flink and sent to the downstream.
changelog format
Usage scenarios
When we fetch database changes from mysql-cdc, or write a group by query, the result data is constantly changing. How do we send these changes to kafka queues that only support Append mode?
So Flink provides a Changelog format. In fact, we can simply understand that Flink wraps the incoming RowData and then adds a data operation type. These include INSERT,DELETE, UPDATE_BEFORE, and UPDATE_AFTER. In this way, when the downstream obtains the data, it can determine how to operate the data according to the data type.
For example, our raw data format looks like this.
{"day":"2020-06-18","gmv":100}
Copy the code
After processing the changelog format, it becomes the following format:
{"data":{"day":"2020-06-18","gmv":100},"op":"+I"}
Copy the code
That is to say, Changelog Format wraps the native format and adds an OP field to indicate the operation type of data. Currently, there are the following:
- +I: insert operation.
- -u: updates the previous data:
- +U: data content after update.
- -d: Delete operation.
The sample
The corresponding POM needs to be introduced when using it
<dependency> <groupId>com.alibaba.ververica</groupId> <artifactId>flink-format-changelog-json</artifactId> The < version > 1.1.0 < / version > < / dependency >Copy the code
Flink SQL is used as follows:
CREATE TABLE kafka_gmv (
day_str STRING,
gmv DECIMAL(10, 5)
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_gmv',
'scan.startup.mode' = 'earliest-offset',
'properties.bootstrap.servers' = 'localhost:9092',
'format' = 'changelog-json'
);
Copy the code
We define a Kafka connector with a format of Changelog-json, which we can then write and query.
For the complete code and configuration, see: github.com/ververica/f…
Source analyses
As a flink format, we mainly look at its serialization and serialization method, Changelog-JSON uses flink- JSON package for JSON processing.
deserialization
Deserialization using ChangelogJsonDeserializationSchema class, in the constructor, we see is primarily constructed a json serializer jsonDeserializer used for data processing.
public ChangelogJsonDeserializationSchema( RowType rowType, TypeInformation<RowData> resultTypeInfo, boolean ignoreParseErrors, TimestampFormat timestampFormatOption) { this.resultTypeInfo = resultTypeInfo; this.ignoreParseErrors = ignoreParseErrors; this.jsonDeserializer = new JsonRowDataDeserializationSchema( createJsonRowType(fromLogicalToDataType(rowType)), // the result type is never used, so it's fine to pass in Debezium's result type resultTypeInfo, false, // ignoreParseErrors already contains the functionality of failOnMissingField ignoreParseErrors, timestampFormatOption); }Copy the code
The createJsonRowType method specifies that the changelog format is a Row format.
private static RowType createJsonRowType(DataType databaseSchema) {
DataType payload = DataTypes.ROW(
DataTypes.FIELD("data", databaseSchema),
DataTypes.FIELD("op", DataTypes.STRING()));
return (RowType) payload.getLogicalType();
}
Copy the code
Here, we specify that the row format has two fields, data, which represents the content of the data, and op, which represents the type of operation.
Finally look at the core of ChangelogJsonDeserializationSchema# deserialize (byte [] bytes, the Collector out >)
@Override public void deserialize(byte[] bytes, Collector<RowData> out) throws IOException { try { GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(bytes); GenericRowData data = (GenericRowData) row.getField(0); String op = row.getString(1).toString(); RowKind rowKind = parseRowKind(op); data.setRowKind(rowKind); out.collect(data); } catch (Throwable t) { // a big try catch to protect the processing. if (! ignoreParseErrors) { throw new IOException(format( "Corrupt Debezium JSON message '%s'.", new String(bytes)), t); }}}Copy the code
JsonDeserializer is used to process the data, and then the second field OP is judged and the corresponding RowKind is added.
serialization
Serialized way we look at methods: ChangelogJsonSerializationSchema# serialize
@Override
public byte[] serialize(RowData rowData) {
reuse.setField(0, rowData);
reuse.setField(1, stringifyRowKind(rowData.getRowKind()));
return jsonSerializer.serialize(reuse);
}
Copy the code
Flink’s RowData is serialized into a byte array using jsonSerializer.
Reference: [1]. www.bilibili.com/video/BV1zt. [2]. Github.com/ververica/f…
Due to my limited level, there are inevitable mistakes, please don’t hesitate to give advice, more information, please also pay attention to my public number [big data technology and application combat], thank you.