takeaway

In the last article “ShardingJdbc sub-database sub-table combat case analysis (1)” we initially introduced the use of ShardingJdbc order data decentralized storage sub-database sub-table method, in this article will focus on the implementation of online expansion of data sharding storage in the case of non-stop service. The following two common scenarios will be used to demonstrate:

  • 1) How to implement the scheme of database and table division smoothly for the single database and table system that has not yet been carried out;
  • 2) For the system that has implemented the scheme of database and table division, the original database and table division is not enough due to the continuous growth of data volume, and it needs to be expanded twice. *

Overview of implementation Plan

Before the specific demonstration, let’s briefly talk about several common ideas in the process of database and table data migration, as follows:

1) Stop service migration

The shutdown migration scheme is the most common and relatively safe data migration scheme, but it is also the most unacceptable scheme. The reason why it is safe is that no new data will be written after the service is stopped, which ensures that data migration can be carried out in a relatively stable environment, thus avoiding data inconsistency problems during migration to a large extent.

But stop the service plan will be more serious harm to user experience, reduce the availability of the service, and if the large amount of data migration needs a lot of time, a long time stop service will seriously affect the business, for the emphasis on the “9999” service experience of Internet companies, stop service plan is absolutely unacceptable.

Generally speaking, the shutdown plan is more suitable for systems that are not 7X24 hours and have very high requirements for their own data consistency, such as social security funds and bank core systems. Of course, this is not to say that the non-stop service scheme can not do the absolute accuracy of data migration, only in these systems for management, rules and other non-technical factors to consider whether they can tolerate small probability of risk events.

The process of service migration is as follows:

1. Rehearse in advance, estimate the time of service interruption, and release the notice of service interruption;

2. Stop the service and use the data migration tool (usually migration script) prepared in advance to migrate data according to the new data sharding rules;

3. Check the accuracy of migration data;

4. Modify the application code, switch the data fragment reading and writing rules, and complete the test verification;

5. Start the service and access external traffic;

2) Upgrade the slave library scheme

The scheme of upgrading the slave database is generally for the system with the master – slave synchronization structure configured on the online database. The specific idea is that the existing secondary library can be directly upgraded to the master library when the expansion of sub-database and sub-table is needed again. For example, the original structure of sub-library and sub-table is that sub-library A and B are the main library, and A0 and B0 are the corresponding secondary libraries of A and B respectively, as shown in the figure below:

Suppose that at this time, if expansion is required and two new branches are added, then A0 and B0 can be upgraded to the main library, so that the original two branches will become four branches. At the same time, the sharding rules of the upper layer service are changed to split the original *** * UID %2=0(originally in library A) into UID %4=0 and UID %2= 2 and store them on A and A0 respectively. Split uid%2=1(originally stored in B library)*** into UID %4=1 and UID % 3 and store them on B and B0 respectively. Because A0 and A, B0 and B1 have the same data, data migration is not required. You only need to change the sharding rule configuration of the service. The subsequent structure is as follows:

The original uid%4=2 data was distributed in 2 libraries, but now it is distributed in 4 libraries. The old data still exists, so half of the uid%4=2 data is stored in the uid%4=0 sub-database. Therefore, redundant data needs to be cleared, but such clearing does not affect the consistency of online data, and can be carried out anytime and anywhere.

After the processing is completed, in order to ensure high availability and demand for further expansion, a secondary library can be allocated to the existing master library, as shown in the following figure:

The process of upgrading the slave library solution is as follows:

1. Modify the service data sharding configuration, and do a good job of data mapping between the new library and the old library;

2. DBA assisted to complete the database switch from the secondary database to the master database;

3. DBA removes the master/slave configuration relationship of the existing database;

4. Clear redundant data asynchronously;

5. Build a new slave library for the new data node;

This scheme avoids the uncertainty caused by data migration, but it also has its limitations. First, the existing database master-slave structure should be able to meet the planning of the new sub-database and sub-table. Secondly, the main technical risk point of this scheme is transferred to THE DBA, which may have great resistance to implement, after all, THE DBA may not be willing to take the responsibility. Finally, unexpected situations can occur due to the need to change the storage structure of the database online, which can be complicated if multiple applications share database instances.

3) Double write scheme

Double-write scheme is a common method for online database migration, and the expansion of database and table also involves data migration. Therefore, double-write can also be used to assist the expansion of database and table.

In the dual-write solution, the principle is similar to that of slave database upgrade. Split expansion is implemented to reduce the scale of direct data migration and reduce the risk of data inconsistency. However, the data synchronization method is different.

The core steps of the double-write scheme are:

  • 1) Directly add new database, and add write link in the original sharding logic, write two copies of data at the same time;

  • 2) At the same time, the historical data in the original old library is gradually synchronized to the new library through tools (such as scripts, programs, etc.). At this time, the data migration has no impact on the new library because the new library only has new writes and other upper-layer application logic is still in the old library.

  • 3) Verify the migrated data. Since services are directly double-write, the consistency of the new data is very high (but it should be noted that insert, UPDATE and DELETE operations all require double-update operations);

  • 4) After data migration synchronization is completed and data consistency is verified, the sharding configuration rules can be modified on the application layer according to the fragmentation mode of the old library.

As an example, the double-write scheme is shown below:

Uid %2=0 is stored in library A, and uid%2=1 is stored in library B. Add A new database, where writing to library A is double write to library A0, and writing to library B is double write to library B0.

After that, the historical data of database A are respectively migrated to database A0. Migrate historical data from database B to database B0. Finally ensure the data consistency between A and A0; The data in database B is consistent with that in database B0. The details are shown in the figure below:

After that, the sharding rules were modified to ensure that the data stored in database A, which originally had UID %2=0, could be stored in database A and A0 when split into UID %4=0 and UID %4=2 respectively. The data originally stored in database B with UID %2=1 is split into database B and DATABASE B0 with UID %4=1 and UID %4=3 respectively. The details are shown in the figure below:

Dual-write scheme avoids the risk of changing the database structure like upgrading slave libraries and is easier for developers to control. However, dual-write scheme requires the intrusion of application code and the completion of data migration and redundant data deletion, which is not easy to implement.

So is there an absolutely perfect solution?

The answer is no! Because neither solution can avoid migrating data, even if it does, like upgrading from a repository, it cannot avoid the additional operation of deleting redundant data. However, we can optimize the means of data migration and the way of reprocessing data fragments. Currently, ShardingSphere, a well-known open source project in the field of database and table segmentation, is essentially optimized in these two aspects, thus providing a set of solution tools.

Specifically, ShardingSphere is an open source solution consisting of three core components: **”Sharding-JDBC+ Sharding-proxy +Sharding-Scaling”. Sharding-jdbc has been introduced in the previous article **. While Sharding-proxy + Sharding-scaling is a component specially designed to deal with the data migration problem of database and table expansion, and its operation principle is as follows:

As shown in the figure above, in the solution of ShardingSphere, when it is necessary to expand the database and table of Service(old) Service, we can first deploy Sharding-Scaling+ Sharding-proxy components for data migration and data Sharding pre-processing. Specifically, the steps are as follows:

1) After configuring Sharding rules in sharding-proxy according to the capacity expansion solution, start the service and provide the JDBC protocol connection mechanism, the data written through sharding-proxy connection will be stored according to the new Sharding rules;

2) Deploy sharding-Scaling service and send data migration task configuration to it in the form of HTTP interface (there are database connection strings to be migrated and sharding-proxy data connection strings in the configuration data);

3) After starting the Sharding-Scaling migration task, sharding-Scaling will read and resend the fragmented data to Sharding-proxy according to the changes in the Binlog of the target data source;

4) When the data migration task of Sharding-scaling is completed, check the data migration results. If there is no problem, modify the data Sharding rules of Service (new) and complete the replacement of Service (old);

5) After confirming the correct capacity expansion, stop the Sharding-proxy + Sharding-scaling service;

6) Asynchronous clearing of redundant data (Currently ShardingSphere does not support automatic clearing of redundant data after data migration, so it is necessary to write clearing scripts according to data splitting rules);

The above process is completely automatic. Before data migration and re-sharding are completed, the old service remains in continuous service and will not have any impact on online services. In addition, since the target data source Binlog logs are always monitored in Sharding-Scaling, even in the process of Service switchover, data is still written in the old Service and will be automatically re-sharded by Sharding-proxy, so there is no need to worry about inconsistency.

In addition, the sharding-Scaling + Sharding-proxy communication mode adopts JDBC connection native connection mode and binlog-based synchronization scheme, so the migration efficiency is also guaranteed. Next, based on sharding-Scaling + Sharding-proxy scheme, the online capacity expansion of sub-databases and sub-tables will be demonstrated in the case of continuous service.

ShardingSphere branch database branch table online expansion

Or the order sub-database and sub-table storage in the above article as an example, the original sub-database and sub-table planning: 1), 2 database nodes (DS0, DS1); 2) The number of sub-tables in each library is 32 tables (0 31). 1) 4 database nodes (DS0, DS1, DS2, DS3); 2) The number of sub-tables in each library is still 32 tables (0 31).

First, we deployed Sharding-Scaling+ Sharding-proxy for online data migration and data fragmentation processing, as follows:

1) Deploy Sharding-proxy

The function of this service is database middleware. After we edit the rules of database and table on this service, sharding-Scaling will write the original data to Sharding-proxy, and then the sharding-proxy will route the data and write the corresponding database and table.

We can download the source code of ShardingSphere on Github (the demo version is 4.1.1) and compile it by ourselves, or download the compiled version file. Source code compilation is used in this demonstration, and the executable program directory is as follows:

/ shardingsphere - 4.4.1 / sharding - distribution/sharding - proxy - distribution/target/apache - shardingsphere - 4.4.1 sharding -- prox y-bin.tar.gzCopy the code

Find the compiler and unpack it! Then edit the **”conf/server.yaml”** file and add the connection account configuration as follows:

authentication:
  users:
    root:
      password: 123456
Copy the code

This configuration is mainly used for sharding-proxy connections when sharding-Scaling is performed! Then edit the sub-database sub-table configuration file “conf/config-sharding.yaml “of Sharding-proxy and configure it according to the new capacity expansion solution as follows:

# exposed outside of the database name schemaName: order dataSources: ds_0: url: JDBC: mysql: / / 127.0.0.1:3306 / order_0? serverTimezone=UTC&useSSL=false username: root password: 123456 connectionTimeoutMilliseconds: 30000 idleTimeoutMilliseconds: 60000 maxLifetimeMilliseconds: 1800000 maxPoolSize: 300 ds_1: url: JDBC: mysql: / / 127.0.0.1:3306 / order_1? serverTimezone=UTC&useSSL=false username: root password: 123456 connectionTimeoutMilliseconds: 30000 idleTimeoutMilliseconds: 60000 maxLifetimeMilliseconds: 1800000 maxPoolSize: 300 ds_2: url: JDBC: mysql: / / 127.0.0.1:3306 / order_2? serverTimezone=UTC&useSSL=false username: root password: 123456 connectionTimeoutMilliseconds: 30000 idleTimeoutMilliseconds: 60000 maxLifetimeMilliseconds: 1800000 maxPoolSize: 300 ds_3: url: JDBC: mysql: / / 127.0.0.1:3306 / order_3? serverTimezone=UTC&useSSL=false username: root password: 123456 connectionTimeoutMilliseconds: 30000 idleTimeoutMilliseconds: 60000 maxLifetimeMilliseconds: 1800000 maxPoolSize: 300 shardingRule: tables: t_order: actualDataNodes: ds_${0.. 3}.t_order_$->{0.. 31} databaseStrategy: inline: shardingColumn: user_id algorithmExpression: ds_${user_id % 4} tableStrategy: inline: shardingColumn: order_id algorithmExpression: t_order_${order_id % 32} keyGenerator: type: SNOWFLAKE column: idCopy the code

After configuring the database and table, start the Sharding-proxy service. Run the following command:

sh bin/start.sh 
Copy the code

In order to verify the correct deployment of ShardingProxy, connect with Mysql command and insert a data to verify whether it is stored in accordance with the new database and table rules, as follows:

Mysql -h 127.0.0.1 -p 3307 -uroot -p123456 mysql> show databases; + -- -- -- -- -- -- -- -- -- -- + | Database | + -- -- -- -- -- -- -- -- -- -- + | order | + -- -- -- -- -- -- -- -- -- -- + 1 row in set # (0.02 SEC), execute the following script, after the success of the writing, Mysql > insert into t_order mysql> insert into t_order Values (' d8d5e92550ba49d08467597a5263205b '10001,' topup, 100, 'CNY', '2', '3', '1010010101', 63631722, now (), now (), 'test sharding - proxy'); Query OK, 1 row affected (0.20 SEC)Copy the code

Uid ->63631722%4=2; OrderId ->10001%32=17, test data should fall in T_ORDER_17 table in DS_2! If yes, the new database and table rules are correctly configured.

2) Deploy sharding-scaling

The sharding-scaling executable package path compiled from the source code is:

/ shardingsphere - 4.4.1 / sharding - distribution/sharding - scaling - distribution/target/apache - shardingsphere - 4.4.1 sharding - sc aling-bin.tar.gzCopy the code

After decompression, the Scaling service was started using the following command:

sh bin/start.sh 
Copy the code

Sharding-scaling is an independent data migration service, which itself is not associated with any specific environment, and the specific information is passed in by the interface when the migration task is created. Next we call Sharding-Scaling to create a specific migration task.

Before this, the data in the original sub-database are as follows:

1), the userId = 63631725; OrderId =123458 stored in the T_ORDER_2 table in DS_1; 2), the userId = 63631722; OrderId =123457 stored in the T_ORDER_1 table in ds_0;Copy the code

According to the new rules, data 1 does not need to be migrated, and data 2 needs to be migrated to table T_ORDER_1 in DS_2. The instructions for creating the Sharding-Scaling migration task are as follows:

# submit order_0 migration data command curl -x POST - url http://localhost:8888/shardingscaling/job/start \ -- header "content-type: application/json' \ --data '{ "ruleConfiguration": { "sourceDatasource": "ds_0: !! org.apache.shardingsphere.orchestration.core.configuration.YamlDataSourceConfiguration\n dataSourceClassName: Com. Zaxxer. Hikari. HikariDataSource \ n properties: \ n jdbcUrl: JDBC: mysql: / / 127.0.0.1:3306 / order_0? serverTimezone=UTC&useSSL=false&zeroDateTimeBehavior=convertToNull\n driverClassName: com.mysql.jdbc.Driver\n username: root\n password: 123456\n connectionTimeout: 30000\n idleTimeout: 60000\n maxLifetime: 1800000\n maxPoolSize: 100\n minPoolSize: 10\n maintenanceIntervalMilliseconds: 30000\n readOnly: false\n", "sourceRule": "tables:\n t_order:\n actualDataNodes: ds_0.t_order_$->{0.. 31}\n keyGenerator:\n column: order_id\n type: SNOWFLAKE", "destinationDataSources": { "name": "dt_1", "password": "123456", "url" : "JDBC: mysql: / / 127.0.0.1:3307 / order? serverTimezone=UTC&useSSL=false", "username": "root" } }, "jobConfiguration": { "concurrency": 1 } }'Copy the code

Above we submitted the data migration task against the old database order_0, and if the submission is successful sharding-scaling will return the following message:

{"success":true,"errorCode":0,"errorMsg":null,"model":null}
Copy the code

You can run a command to view the task progress. The command output is as follows:

#curl http://localhost:8888/shardingscaling/job/progress/1; { "success": true, "errorCode": 0, "errorMsg": null, "model": { "id": 1, "jobName": "Local Sharding Scaling Job", "status": "RUNNING", "syncTaskProgress": [{"id": "127.0.0.1-3306-order_0", "status": [{"id": "127.0.0.1-3306-order_0", "status": "SYNCHRONIZE_REALTIME_DATA", "historySyncTaskProgress": [{ "id": "history-order_0-t_order_24#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_25#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_22#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_23#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_20#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_21#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_19#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_17#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_18#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_15#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_16#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_13#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_14#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_8#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_11#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_9#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_12#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_6#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_31#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_7#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_10#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_4#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_5#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_30#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_2#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_3#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_0#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_1#0", "estimatedRows": 1, "syncedRows": 1 }, { "id": "history-order_0-t_order_28#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_29#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_26#0", "estimatedRows": 0, "syncedRows": 0 }, { "id": "history-order_0-t_order_27#0", "estimatedRows": 0, "syncedRows": 0 }], "realTimeSyncTaskProgress": { "id": "realtime-order_0", "delayMillisecond": 8759, "logPosition": { "filename": "mysql-bin.000001", "position": 190285, "serverId": 0 } } }] } }Copy the code

Migrations for other older libraries, such as the ORDER_1 database, can also be submitted in a similar manner! If you look at data 2 at this point, you will see that it has been resharded into the T_ORDER_1 table of DS_2; However, the previous historical shard data will still be redundant in the T_ORDER_1 table of DS_0 (need to be cleaned up).

Suppose that at this point, there is a “userId=63631723&orderId=123457” data written through the old service, it will be stored in THE T_ORDER_1 table of DS_1, and its latest storage rule should be in the T_ORDER_1 table of DS_3. If the Scaling data migration task of the ORDER_1 database was also started at the same time, the data will be automatically re-migrated and sharded into the T_ORDER_1 table of DS_3.

After the data migration is complete, the sharding rules of the old service can be adjusted and republished as follows:

Development of a # SQL console print (configuration) spring. Shardingsphere. Props. SQL. Show # = true true data source configuration Spring. Shardingsphere. The datasource names = ds0, ds1, ds2, ds3 1 # configure a data source spring.shardingsphere.datasource.ds0.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.ds0.driver-class-name=com.mysql.jdbc.Driver Spring. Shardingsphere. The datasource. The ds0. Url = JDBC: mysql: / / 127.0.0.1:3306 / order_0? CharacterEncoding = utf-8 Spring. Shardingsphere. The datasource. The ds0. Username = root spring. Shardingsphere. The datasource. The ds0. Password = 123456 # configuration second data source spring.shardingsphere.datasource.ds1.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.ds1.driver-class-name=com.mysql.jdbc.Driver Spring. Shardingsphere. The datasource. Ds1. Url = JDBC: mysql: / / 127.0.0.1:3306 / order_1? CharacterEncoding = utf-8 Spring. Shardingsphere. The datasource. Ds1. Username = root spring. Shardingsphere. The datasource. An password = 123456 # configuration third source spring.shardingsphere.datasource.ds2.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.ds2.driver-class-name=com.mysql.jdbc.Driver Spring. Shardingsphere. The datasource. Ds2. Url = JDBC: mysql: / / 127.0.0.1:3306 / order_2? CharacterEncoding = utf-8 Spring. Shardingsphere. The datasource. Ds2. Username = root spring. Shardingsphere. The datasource. Ds2. Password = 123456 4 # configure a data source spring.shardingsphere.datasource.ds3.type=com.alibaba.druid.pool.DruidDataSource spring.shardingsphere.datasource.ds3.driver-class-name=com.mysql.jdbc.Driver Spring. Shardingsphere. The datasource. The ds3. Url = JDBC: mysql: / / 127.0.0.1:3306 / order_3? CharacterEncoding = utf-8 Spring. Shardingsphere. The datasource. The ds3. Username = root spring. Shardingsphere. The datasource. The ds3. T_order password = 123456 # configuration rules spring.shardingsphere.sharding.tables.t_order.actual-data-nodes=ds$->{0.. 3}.t_order_$->{0.. 31} # configure t_ORDER (inline- line sharding algorithm) spring.shardingsphere.sharding.tables.t_order.database-strategy.inline.sharding-column=user_id spring.shardingsphere.sharding.tables.t_order.database-strategy.inline.algorithm-expression=ds${user_id % 2} # Spring configuration t_order table table strategy. Shardingsphere. Sharding. Name t_order. Table - strategy. The inline. Sharding - column = order_id spring.shardingsphere.sharding.tables.t_order.table-strategy.inline.algorithm-expression = t_order_$->{order_id % 32} T_order = t_order ();Copy the code

The above content describes in detail and demonstrates the process of data migration using Sharding-Scaling+ Sharding-proxy in secondary capacity expansion for the system that has been divided into databases and tables. As for the system that has not yet carried out the sub-database sub-table, but needs to carry out the sub-database sub-table, its process and secondary capacity expansion is no different, here will not repeat!

Other problems needing attention in the practice of database and table

In the practice of database and table partitioning, attention should also be paid to the primary key of the table. Generally, distributed ID generation schemes, such as UUID, can be considered to avoid primary key conflicts during data expansion and migration. In addition, some anomalies need to be noted with regard to the use of Sharding-scaling + Sharding-proxy. Currently, the version of Sharding-scaling is still the Alpha version, so there will be some problems in the process of use, so you can read the source code more to improve your understanding!

At last, due to the lack of space, there is no specific demonstration of historical data cleaning methods. If you have better methods in practice, you are also welcome to synchronize them to me! The above is all the content that this article wants to express, hope useful to you!

Reprinted from the public account: Incomparable code nong