The steps for implementing MySQL to Greenplum incremental synchronization were described in detail in “How to Implement MySQL to Greenplum Incremental synchronization with GPSS”. Today I’m going to talk about how Oracle to Greenplum is implemented.

Although Oracle database still has a dominant position in THE OLTP field, it has a significant gap with Greenplum in the OLAP field. More and more analytical businesses are migrating from Oracle to Greenplum. In the How to Migrate from Oracle to Greenplum series, best practices for migrating businesses are covered in detail. The core of data migration is how to realize real-time incremental synchronization of data.

For incremental synchronization, GPSS is decouple from the source side as a flow computing framework, so as long as the messages in Kafka topic contain enough information, GPSS can extract the changing data and put it into GP. In the previous section, you can use GPSS to synchronize incremental data from Maxwell and Mysql. Here, you can use Oracle Golden Gate to synchronize incremental data from Oracle in real time.

1 Test Environment

  • Oracle

  • Oracle Golden Gate

  • Kafka 2.2.2

  • Greenplum 6.4.0

  • GPSS 1.3.6

What we want to accomplish is:

  • Send delta data from Oracle to Kafka in JSON format via GoldenGate

  • Parse JSON messages in Kafka using GPSS

  • Update the changed data to the target table in Greenplum

2 Introduction to Test Data

The tables used for the tests are defined in Oracle as follows:

CREATE TABLE SIEBEL_TEST.TEST_POC(
   ID numeric,
   NAME varchar2 (50),
   BIRTHDAY date
)
Copy the code

ID is the key that uniquely identifies a record, and NAME and BIRTHDAY are the update fields.

Insert, UPDATE, and DELETE operations are performed on this table at the source side.

Insert statement:

insert into test_poc values (1, 'Igor'.'01-JAN-2000');
Copy the code

The Update statement is:

update test_poc set birthday=add_months(birthday,1) where id <3;
Copy the code

The Delete statement is:

delete from test_poc where id=3;
Copy the code

3 Kafka message format

Next, let’s briefly analyze the three types of Golden Gate messages.

An example of the message generated during Insert is as follows:

{
  "table": "SIEBEL_TEST.TEST_POC"."op_type": "I"."op_ts": "The 2019-11-21 10:05:34. 000000"."current_ts": "The 2019-11-21 T11:05:37. 823000"."pos": "00000000250000058833"."tokens": {
    "TK_OPTYPE": "INSERT"."SCN": ""
  },
  "after": {
    "ID": 1,
    "NAME": "Igor"."BIRTHDAY": "The 2000-01-01 00:00:00"}}Copy the code

Table represents the name of the source Table and current_ts represents the time at which the operation occurred, which we use here for sorting. Op_type and after indicate the operation and corresponding data.

Delete produces the following message, op_type is “D”, and before contains the complete content.

{
  "table": "SIEBEL_TEST.TEST_POC"."op_type": "D"."op_ts": "The 2019-11-21 10:13:19. 000000"."current_ts": "The 2019-11-21 T11: but. 060002"."pos": "00000000250000059999"."tokens": {
    "TK_OPTYPE": "DELETE"."SCN": ""
  },
  "before": {
    "ID": 3."NAME": "Gianluca"."BIRTHDAY": "The 2002-01-01 00:00:00"}}Copy the code

Update contains the new data (after) as well as the previous data (before), with op_type of type ‘U’.

{
  "table": "SIEBEL_TEST.TEST_POC"."op_type": "U"."op_ts": "The 2019-11-21 10:13:19. 000000"."current_ts": "The 2019-11-21 T11: but. 060000"."pos": "00000000250000059561"."tokens": {
    "TK_OPTYPE": "SQL COMPUPDATE"."SCN": ""
  },
  "before": {
    "ID": 1,
    "NAME": "Igor"."BIRTHDAY": "The 2000-01-01 00:00:00"
  },
  "after": {
    "ID": 1,
    "NAME": "Igor"."BIRTHDAY": "The 2000-02-01 00:00:00"}}Copy the code

Based on the generated message, we need to do the following:

  • Deduplicate the message by ID

  • Sort messages by TS

  • Delete the column where op_type is D

  • Merge(upsert) for other types

4 Run the Kafka JOB of GPSS

The definition in Greenplum contains a sorting field, TS, to distinguish the order in which messages are updated, defined as follows:

CREATE TABLE test_poc(
   id numeric,
   name varchar (50),
   birthday date,
   ts timestamp
);
Copy the code

Yaml configuration files required by GPSS for data synchronization are as follows:

DATABASE: test
​USER: gpadmin
HOST: mdw
PORT: 5432
VERSION: 2
KAFKA:
   INPUT:
      SOURCE:
        BROKERS: kafkahost:9092
        TOPIC: oggpoc
      VALUE:
        COLUMNS:
          - NAME: c1
            TYPE: json
        FORMAT: json
      ERROR_LIMIT: 100
   OUTPUT:
      MODE: MERGE
      MATCH_COLUMNS:
        - id
      UPDATE_COLUMNS:
        - name
        - birthday
      ORDER_COLUMNS:
        - ts
      DELETE_CONDITION: c1->>'op_type' = 'D'
      TABLE: test_poc
      MAPPING:
         - NAME: id
           EXPRESSION: |
              CASE WHEN ((c1->'after')::json is not null) THEN (c1->'after'->>'ID') : :integer
              ELSE (c1->'before'->>'ID') : :integer end
         - NAME: name
           EXPRESSION: |
              CASE WHEN ((c1->'after')::json is not null) THEN (c1->'after'->>'NAME')::text
              ELSE null end
         - NAME: birthday
           EXPRESSION: |
              CASE WHEN ((c1->'after')::json is not null) THEN (c1->'after'->>'BIRTHDAY')::date
              ELSE null end
         - NAME: ts
           EXPRESSION: (c1->>'current_ts')::timestamp
   COMMIT:
      MINIMAL_INTERVAL: 2000
Copy the code

See Reference [2] for the meaning of the related fields and the actual operations performed by GPSS. Here are the differences: Since the actual contents of insert and update operations are contained in after and the contents of DELETE operations are contained in before, the contents of each field require additional judgment logic: Read after when there is after, otherwise read before.

In addition, if the contents of ORDER_COLUMNS of the new message are duplicated, GPSS will record all rows containing the duplicated contents to the target table. The main purpose of this is to avoid data loss. Therefore, in practical use, we must ensure that the sorting section is unique.

Once the configuration file is ready, we perform the load via gpkafka:

gpkafka load ogg.yaml
Copy the code

Gpkafka pulls the corresponding message from Kafka and synchronizes the incremental data in Kafka to the target table as specified.

5 subtotal

This is a brief description of how to use GPSS to synchronize Oracle increments generated from Kafka consumption of GoldenGate. Other databases and CDC tools (such as Informatica, StreamSet, NIFI, etc.) can use a similar scheme to synchronize. In the future will do more relevant introduction, welcome to try, feedback, guidance.

6 References

  1. Github.com/pdeemea/kaf…

  2. How to implement incremental synchronization from MySQL to Greenplum using GPSS

  3. GPDB. Docs. Pivotal. IO/streaming – s…