The paper

CloudCanal recently implemented real-time synchronization capabilities from MySQL (RDS) to ClickHouse, including full data migration, incremental data migration, structure migration capabilities, as well as accompanying monitoring, alarm, HA, and other capabilities (built-in to the platform).

ClickHouse itself does not directly support Update and Delete capabilities, But his own MergeTree series CollapsingMergeTree and VersionedCollapsingMergeTree can be disguised in the table, the purpose of real-time increment and the performance is quite enough, ability to more easily reach more than 1 k RPS.

The following article briefly describes how CloudCanal implements this capability and how we can use it as users.

Technical point

Structure of the migration

CloudCanal default CollapsingMergeTree as the table engine and adds a default field __cc_ck_sign and source primary key as the sortKey as the default CollapsingMergeTree table engine, as shown in the following example:

 CREATE TABLE console.worker_stats
(
    `id` Int64,
    `gmt_create` DateTime,
    `worker_id` Int64,
    `cpu_stat` String,
    `mem_stat` String,
    `disk_stat` String,
    `__cc_ck_sign` Int8 DEFAULT 1
)
ENGINE = CollapsingMergeTree(__cc_ck_sign)
ORDER BY id
SETTINGS index_granularity = 8192
Copy the code

ClickHouse table engine, CollapsingMergeTree and VersionedCollapsingMergeTree can fold data by labeling according to the rules, so as to achieve the effect of the update, and delete. VersionedCollapsingMergeTree CollapsingMergeTree compared advantage can change order of the same data, but CloudCanal choose CollapsingMergeTree main reason is 2 points

  • The same record in CloudCanal must be written according to the change order of source database, and there is no out-of-order situation
  • Don’t need to maintain VersionedCollapsingMergeTree Version of the field (Version, can also be other name)

So CloudCanal selects the default CollapsingMergeTree as the default table engine.

Write the data

CloudCanal write data mainly includes two types: full data and incremental data, that is, single-time migration storage data and long-term synchronization data. The two types of write data are slightly different. The main work of full writing to the peer end is batch writing and multi-threading. Because CloudCanal structure migration sets the mark bit field __CC_CK_sign default value to 1 by default, there is no need to do special processing.

For increments, CloudCanal needs to do three things.

  • Convert Update and Delete operations to Insert

There are two things to do in this step. The first is to fill the tag field values for the operation type, where Insert and Update are 1 and Delete is -1. The second is to fill the pre-mirror or post-mirror of the corresponding incremental data into the result record for subsequent Insert writes.

for (CanalRowChange rowChange : rowChanges) { switch (rowChange.getEventType()) { case INSERT: { for (CanalRowData rowData : rowChange.getRowDatasList()) { rowData.getAfterColumnsList().add(nonDeleteCol); records.add(rowData.getAfterColumnsList()); } break; } case UPDATE: { for (CanalRowData rowData : rowChange.getRowDatasList()) { rowData.getBeforeColumnsList().add(deleteCol); records.add(rowData.getBeforeColumnsList()); rowData.getAfterColumnsList().add(nonDeleteCol); records.add(rowData.getAfterColumnsList()); } break; } case DELETE: { for (CanalRowData rowData : rowChange.getRowDatasList()) { rowData.getBeforeColumnsList().add(deleteCol); records.add(rowData.getBeforeColumnsList()); } break; } default: throw new CanalException("not supported event type,eventType:" + rowChange.getEventType()); }}Copy the code
  • According to the table belong to the group

Because IUD operations are fully converted to INSERTS and are fully mirrored (all fields are populated with values), they can be grouped by table and then written in bulk. Even single threading can meet synchronization performance requirements in most scenarios.

protected Map<TableUnit, List<CanalRowChange>> groupByTable(IncrementMessage message) {
        Map<TableUnit, List<CanalRowChange>> data = new HashMap<>();
        for (ParsedEntry entry : message.getEntries()) {
            if (entry.getEntryType() == CanalEntryType.ROWDATA) {
                CanalRowChange rowChange = entry.getRowChange();
                if(! rowChange.isDdl()) { List<CanalRowChange> changes = data.computeIfAbsent(new TableUnit(entry.getHeader().getSchemaName(), entry.getHeader().getTableName()), k -> newArrayList<>()); changes.add(rowChange); }}}return data;
    }
Copy the code
  • In parallel to

The data grouped by table is executed using the parallel execution framework, which is not detailed.

For example, “chestnuts”

  • Adding a data source
  • Create task, select data source and library, connect successfully, click Next
  • Select data synchronization, the recommended specification is at least 1 GB. Currently MySQL->ClickHouse structure migration automatically filters, so the selection is invalid. Click Next
  • Select the table created on ClickHouse by defaultCollapsingMergeTreeTable engine and automatically add__cc_ck_signCollapse the tag field. Click Next
  • Select the field and click Next
  • Create a task
  • Wait for task automatic structure migration, full migration, and data synchronization to catch up
  • Create Insert, Update, and Delete loads
  • Delay the leveling state and stop the load
  • Check the data of the source MySQL table. The following uses one table as an example
  • Check the peer ClickHouse table data, inconsistent? !!!!!
  • Manually optimize the following table to ensure consistent data. While you can wait for ClickHouse to automate, you can manually optimize if you want to get accurate results directly (note: Manual optimization can cause excessive database machine stress)

Q&A

What if I have created a table on ClickHouse?

At present, it is recommended to directly use CloudCanal automatic structure migration to create tasks.

If you have already created the CollapsingMergeTree table engine, change the marker bit field to __cc_CK_sign Int8 DEFAULT 1 ‘, and then create the task in the CollapsingMergeTree table (instead of automatically migrating the structure, use the existing table).

If it is another table engine, it is not supported for the time being (mainly incremental capability, which will need to be explored further by CloudCanal).

When will synchronized past data merge?

When CloudCanal synchronize data to ClickHouse, ClickHouse will not merge the data in real time and there is no consistency at all, so the general situation is to wait for the merge or directly merge manually (resulting in high machine load and high IO). For example, optimize table worker_STATS FINAL.

What does DDL do?

Currently CloudCanal does not support DDL synchronization to ClickHouse, and the product implementation is currently ignored. Therefore, if you want to do DDL, you are advised to add fields to the peer end first and then add fields to the source end, and subtract fields to the other end.

conclusion

This article briefly describes CloudCanal’s ability to synchronize data migration from MySQL (RDS) to ClickHouse with a one-stop, real-time data feature, in terms of technology points, examples, and frequently asked questions. If there are any mistakes in the article, please erratum. You are also welcome to try it out later and provide valuable comments and suggestions.

At the end of the article, post our community version download post, welcome to download and try our product, quickly to build your own online data task.