Data synchronization is generally divided into two modes: full and incremental. Incremental data is a typical type of streaming data, and log-based incremental synchronization has become standard in almost all databases. It can reduce the impact of routine ETL work on the system and greatly reduce data latency. As Greenplum’s Stream computing engine, Greenplum Stream Server (GPSS) synchronizes incremental data from different sources to Greenplum. To better support this scenario, incremental synchronization has been enhanced in the upcoming GPSS 1.3.6 release.
Greenplum Stream Server(GPSS for short) is the next generation data loading solution of Greenplum. Compared with GPFDIST, GPSS provides streaming data support and API interface, has better scalability, supports richer functions, and opens more fine-grained task control interface. Enhancements to incremental synchronization in the upcoming GPSS 1.3.6 include:
You can ensure that the latest message takes effect by specifying an increment sort field
Merge Supports INSERT, UPDATE, and DELETE operations
Using MySQL as an example, this article briefly describes how GPSS implements incremental synchronization to Greenplum.
The test environment
- MySQL 8.0.13
- Maxwell 1.25.0
- Kafka 2.2.2
- Greenplum 6.4.0
- GPSS 1.3.6
What we want to accomplish is:
-
Using Maxwell to listen for incremental changes in MySQL binlog
-
Send incremental data to Kafka in JSON format (omitted)
-
Parse JSON messages in Kafka using GPSS
-
Update the changed data to the target table in Greenplum
The configuration and use of MySQSL and Maxwell will not be described in depth in this article. You can visit the article link to read and learn by yourself. Please click “Read the original article” at the bottom of the article to access related articles.
Introduction to Test Data
The tables used in the tests are defined in MySQL as follows:
create table t_update_delete_0 (k1 decimal,
k2 text,
v1 decimal,
v2 decimal,
v3 text,
c1 decimal,
c2 text);
Copy the code
K1 and K2 are used as keys to uniquely identify a record, while V1, v2 and v3 are the data updated each time.
Insert, UPDATE, and DELETE operations are performed on this table at the source side, with each statement being a separate transaction.
Insert statement:
insert into t_update_delete_0 (k1,k2,v1,v2,v3,c1,c2)
values (1,'k_1', 1, 3, 'v_1', 1, 'c1');
Copy the code
The Update statement is:
update t_update_delete_0 set v1=100,v2=300,v3=’v_100′ where k1=’1′ and k2=’k_1′;
The Delete statement is:
delete from t_update_delete_0 where k1='1' and k2='k_1';
Copy the code
Kafka message format
Maxwell can parse captured binlogs into JSON format and send them to Kafka. Different operations generate slightly different Kafka messages. To properly restore these messages to Greenplum, let’s do a simple analysis of the three types of messages.
An example of the message generated during Insert is as follows:
{
"database": "test"."table": "t_update_delete_0"."type": "insert"."ts": 1586956781,
"xid": 1398209,
"commit": true."data": {
"k1": 41."k2": "k_41"."v1": 818,
"v2": 2454,
"v3": "v_818"."c1": 41."c2": "c_41"}}Copy the code
Database and table indicate the name of the source table, TS and XID indicate the order of messages, and Type and data indicate the operation and corresponding data. These are common to all message types.
Delete generates the following message, type “Delete”, and data contains the complete content.
{
"database": "test"."table": "t_update_delete_0"."type": "delete"."ts": 1586956781,
"xid": 1398195,
"commit": true."data": {
"k1": 44,
"k2": "k_44"."v1": 744,
"v2": 2232,
"v3": "v_744"."c1": 44,
"c2": "c_44"}}Copy the code
Update contains the old data as well as the new data, so we only need the new data.
{
"database": "test"."table": "t_update_delete_0"."type": "update"."ts": 1586956707,
"xid": 1281915,
"commit": true."data": {
"k1": 99,
"k2": "k_99"."v1": 798,
"v2": 2394,
"v3": "v_798"."c1": 99,
"c2": "c_99"
},
"old": {
"v1": 800,
"v2": 2400,
"v3": "v_800"}}Copy the code
Based on the generated message, we need to do the following:
Sort the data by TS and XID
Match according to k1 and k2
Delete the column whose type is delete
Merge(upsert) for other types
Execute Kafka JOB of GPSS
The definition in Greenplum contains sorted fields to distinguish the order in which messages are updated, defined as follows:
create table t_update_delete_0 (k1 decimal,
k2 text,
v1 decimal,
v2 decimal,
v3 text,
c1 decimal,
c2 text,
ts decimal,
xid decimal,
del_mark boolean);
Copy the code
Yaml configuration files required by GPSS for data synchronization are as follows:
DATABASE: test
USER: gpadmin
HOST: mdw
PORT: 5432
VERSION: 2
KAFKA:
INPUT:
SOURCE:
BROKERS: kafkahost:9092
TOPIC: test
VALUE:
COLUMNS:
- NAME: c1
TYPE: json
FORMAT: json
ERROR_LIMIT: 100
OUTPUT:
MODE: MERGE
MATCH_COLUMNS:
- k1
- k2
UPDATE_COLUMNS:
- v1
- v2
- v3
ORDER_COLUMNS:
- ts
- xid
DELETE_CONDITION: del_mark
TABLE: t_update_delete_0
MAPPING:
k1 : (c1->'data'->>'k1')::decimal
k2 : (c1->'data'->>'k2')::text
v1 : (c1->'data'->>'v1')::decimal
v2 : (c1->'data'->>'v2')::decimal
v3 : (c1->'data'->>'v3')::text
c1 : (c1->'data'->>'c1')::decimal
c2 : (c1->'data'->>'c2')::text
ts : (c1->>'ts')::decimal
xid: (c1->>'xid')::decimal
del_mark: (c1->>'type')::text = 'delete'
COMMIT:
MINIMAL_INTERVAL: 2000
Copy the code
The main configuration implications are as follows:
- ORDER_COLUMNS: Incrementalsorted columns used by GPSS in each batch
ORDER_COLUMNS
The largest message content operates on the target table. - DELETE_CONDITION: Soft delete flag, GPSS deletes contains
DELETE_CONDITION
Field match record - MATCH_COLUMNS: The identification of a record, also known as a candidate key
- UPDATE_COLUMNS: indicates the column to be updated
In summary, GPSS performs the following steps:
-
In a batch, all the records that MATCH_COLUMNS are the same are first de-duplicated according to ORDER_COLUMNS
-
When a MATCH_COLUMNS matching record exists in the target table, the UPDATE_CONDITION or DELETE_CONDITION operation is performed
-
An insert operation is performed when a match record does not exist in the target table.
(To prevent data loss due to de-duplication, Kafka messages need to contain entire rows of data when updated, not just parts of the data.)
Once the configuration file is ready, we perform the load via gpkafka:
gpkafka load mysql.yaml
Copy the code
Gpkafka pulls the corresponding message from Kafka and synchronizes the incremental data in Kafka to the target table as specified.
summary
This article briefly describes how to use GPSS to perform incremental synchronization from MySQL. Other databases (such as Oracle, SQL Server, etc.) can also use a similar scheme to achieve synchronization. Different message types require different processing logic, GPSS configuration file there is a lot to the reprocessing part, more detailed content you can refer to the official document: GPDB. Docs. The pivotal. IO/streaming – s… Offset sort, etc.; Welcome to use, feedback, guidance. You are also welcome to visit askGP (ask.greenplum. Cn).