Prerequisites: over 800 million data in Tidb(Mysql kernel) needs to be exported from Mysql to Elasticsearch7.

At first I thought, “Let’s use the venerable ELK for data migration!” . Logstash is a data collector that filters and formats data into Elasticsearch. The JDBC plug-in of Logstash ensures that any data with a JDBC interface can be used as input.

Logstash is an open source data collection engine with real-time pipeline capabilities. Simply put, LogStash serves as a bridge between data source and data storage analysis tool. Combined with ElasticSearch and Kibana, it can greatly facilitate data processing and analysis. With more than 200 plug-ins, LogStash accepts almost all kinds of data. These include logging, network requests, relational databases, sensors, or the Internet of things.

The Logstash process

In addition to installing the Logstash on the ES cluster (which has been tuned for machine performance), you need to write the Logstash configuration file, including the index template for ES7, which I’ll post when I talk about Datax.

logstash.conf

Input {JDBC {jdbC_driver_library => "/opt/jar/mysql-connector-java-5.1.48.jar" jdbC_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://xxx:4000/ODS_MES_FinishedProd_B400? characterEncoding=utf8&useSSL=false" jdbc_user => "xxx" jdbc_password => "xxx" jdbc_paging_enabled => "true" lowercase_column_names => "false" jdbc_page_size => "1000" statement => "SELECT LOTID,LOTNO,PRODUCTIONORDERID,PARTID FROM LOT" } } output { elasticsearch { hosts => ["xxx:9200","xxx:9200","xxx:9200"] index => "lot" template_name => "lot_template" } stdout { codec => rubydebug } }Copy the code

After executing./logstash -f /opt/file/logstash. Conf, I found the speed was ok at first and managed to import 160W files after observing it for 10min (later I asked my offline colleague and he told me it was too slow!!). However, I realized that the amount of 960W data that I was supposed to import after an hour was less than 600W, and the increment of data in the preceding hours was only a few hundred thousand pieces. I was shocked!

Later, WHEN I looked up the community, I found that the logstash full import would encounter the bottleneck of deep paging, so it was not suitable for large volume import. The community had a strategy for slow paging SQL queries, but I thought the SQL was too complex to write, and I didn’t have enough patience at the time, so I didn’t delve into it. (Look at deep paging later when you have time.)

To get to the core of this article, a colleague in the off-line group pointed out that Datax could be used to do data migration. According to him, Datax can be used in their data group to conduct 300 million entries per hour. At first, I thought it was just data import, how much time it took to prepare the data, but after I learned to use logstash, I didn’t think I had to change the import tool to start from the beginning. Alas, IT was because I didn’t do a good job of research on the tool.

Okay, so let’s move on to the Datax part

First of all, let’s go to ali dad’s open source Datax to put the source code down, and then select the module I need to avoid the compilation time is too long. When it comes to compiling, there is no way to do it. Here is a set of compilation datax:

  1. First configure ali’s mirror repository in Maven configuration file, otherwise poM dependencies will not be down!!
<mirrors> <! <id> Alimaven </id> <name> Aliyun maven</name> <url>http://maven.aliyun.com/nexus/content/groups/public/</url> <mirrorOf>central</mirrorOf> </mirror> <mirror> <id>alimaven</id> <mirrorOf>central</mirrorOf> <name>aliyun maven</name> <url>http://maven.aliyun.com/nexus/content/repositories/central</url> </mirror> <mirror> <id>nexus-aliyun</id> <mirrorOf>central</mirrorOf> <name>Nexus aliyun</name> <url>https://maven.aliyun.com/repository/central</url> </mirror> <! -- Maven private server for Alicloud --> <! Repo1 </ ID > <mirrorOf> Central </mirrorOf> <name>Human Readable name for this mirror.</name> <url>http://repo1.maven.org/maven2/</url> </mirror> <mirror> <id>repo2</id> <mirrorOf>central</mirrorOf> <name>Human Readable Name for this Mirror.</name> <url>http://repo2.maven.org/maven2/</url> </mirror> <! -- Central warehouse --> </mirrors>Copy the code
  1. Then comment out unnecessary modules in the datax pom.xml (ps: do not delete necessary modules) to speed up compilation time. Here I left mysqlReader and ElasticSearchWriter’s read-write module:
<modules> <module>common</module> <module>core</module> <module>transformer</module> <module>mysqlreader</module> <module>elasticsearchwriter</module> <! -- reader --> <! -- <module>drdsreader</module> <module>sqlserverreader</module> <module>postgresqlreader</module> <module>kingbaseesreader</module> <module>oraclereader</module> <module>odpsreader</module> <module>otsreader</module> <module>otsstreamreader</module> <module>txtfilereader</module> <module>hdfsreader</module> <module>streamreader</module> <module>ossreader</module> <module>ftpreader</module> <module>mongodbreader</module> <module>rdbmsreader</module> <module>hbase11xreader</module> <module>hbase094xreader</module> <module>tsdbreader</module> <module>opentsdbreader</module> <module>cassandrareader</module> <module>gdbreader</module> -- > <! -- writer --> <! -- <module>drdswriter</module> <module>odpswriter</module> <module>txtfilewriter</module> <module>ftpwriter</module> <module>hdfswriter</module> <module>streamwriter</module> <module>otswriter</module> <module>oraclewriter</module> <module>sqlserverwriter</module> <module>postgresqlwriter</module> <module>kingbaseeswriter</module> <module>osswriter</module> <module>mongodbwriter</module> <module>adswriter</module> <module>ocswriter</module> <module>rdbmswriter</module> <module>hbase11xwriter</module> <module>hbase094xwriter</module> <module>hbase11xsqlwriter</module> <module>hbase11xsqlreader</module> <module>elasticsearchwriter</module> <module>tsdbwriter</module> <module>adbpgwriter</module> <module>gdbwriter</module> <module>cassandrawriter</module> <module>clickhousewriter</module> <module>oscarwriter</module> --> <! -- common support module --> <module>plugin-rdbms-util</module> <module>plugin-unstructured-storage-util</module> <module>hbase20xsqlreader</module> <module>hbase20xsqlwriter</module> <module>kuduwriter</module> </modules>Copy the code

CMD go to the datax window and enter MVN -u clean package assembly: assembly-dmaven.test. skip=true. For Powershell, -dmaven.test. skip=true requires single quotes.

3. After successful compilation, under/DataX/target became a DataX. Tar. Gz, move the package to the Linux to extract, at this point, we took a giant leap for introductory DataX!

Tidb2es.json: / / tidb2es.json: / / tidb2es.json: / / tidb2es.json

BatchSize: 4096}}, "content": [{"reader": {"name": {"job": {"job": {"job": {"speed": {"channel": 32 # "Mysqlreader ", "parameter": {"splitPk": "LOTID", # splitPk: ["querySql": ["SELECT LOTID,LOTNO,PRODUCTIONORDERID,DATE_FORMAT(CREATEDDATE,'%Y-%m-%d %H:%i:%S') as CREATEDDATE FROM LOT"], "jdbcUrl": [ "jdbc:mysql://xxx:4000/ODS_MES_FinishedProd_B400?useUnicode=true&characterEncoding=utf-8" ] } ], "username": "xxx", "password": "xxx" } }, "writer": { "name": "elasticsearchwriter", "parameter": { "endpoint": "http://xxx:9200", "index": "lot", "type":"_doc", # there is only one type" Settings ": {"index" :{"number_of_shards": 3, "number_of_replicas": 1, "refresh_interval": "120s"}}, "discovery": false, "Column ": [{"name": "LOTID", "type": "keyword"}, {"name": "LOTNO","type": "keyword" }, { "name": "PRODUCTIONORDERID","type": "keyword" }, { "name": "CREATEDDATE","type": "date" } ] } } } ] } }Copy the code
  1. After writing the configuration file, we excitedly went to Linux to try it out,python /datax/bin/datax.py --jvm="-Xms8G -Xmx8G" tidb2es.json.

Next, a series of error reporting problems:

  1. Caused by: java.lang.IllegalArgumentException: Preemptive authentication set without credentials

SetPreemptiveAuth (new HttpHost(endpoint)) {setPreemptiveAuth(new HttpHost(endpoint))} Pack to upload to replace after recompiling/datax/plugins/writer/elasticsearchwriter is ok.

Path: \DataX-master\DataX-master\elasticsearchwriter\src\main\java\com\alibaba\datax\plugin\writer\elasticsearchwriter\ESClien t.java Builder httpClientConfig = new HttpClientConfig .Builder(endpoint) // .setPreemptiveAuth(new HttpHost(endpoint)) .multithreaded (multiThread).connTimeout(30000).readTimeout(readTimeout).maxTotalConnection(200). .requestCompressionEnabled(compression) .discoveryEnabled(discovery) .discoveryFrequency(5l, TimeUnit.MINUTES);Copy the code
  1. ERROR retryutil-exception when calling callable, Msg:No Server is assigned to client to connect

The reason is that discovery must be set to false in the configuration file tidb2es.json, enabling node discovery will (polling) and periodically update the list of servers in the client.

  1. Rejecting mapping update to [lot] as the final mapping would have more than 1 type: [_doc, lot]

Json. The reason why DATax reported this error was that I didn’t set type in tidb2es. Json because I thought the index template already had a type field, while ES7 can only have one type. The error is that I set two types, so I’m confused.

I tried to set type in tidb2es.json to _doc as well, Success! If I do not set type in tidb2es.json, it will automatically generate the same type as the index, so there are two different types.

PUT _template/lot_template
{
  "template": "lot*",
  "order": 1,
  "settings": {
    "number_of_shards": 3,
    "number_of_replicas": 1,
    "refresh_interval": "120s"
  },
  "mappings": {
    "properties": {
      "LOTID": {
        "type": "keyword",
        "doc_values": false
      },
      "LOTNO": {
        "type": "keyword",
        "doc_values": false
      },
      "PRODUCTIONORDERID": {
        "type": "keyword"
      },
      "CREATEDDATE": {
        "type": "date",
        "format": "yyyy-MM-dd HH:mm:ss",
        "index": false
      }
    }
  }
}
Copy the code
  1. GC life time is shorter than transaction duration, transaction starts at 2021-03-26 19:58:12.823 +0800 CST, GC safe point is 2021-03-26 20:00:03.523 +0800 CST

The tiDB gc time is shorter than the long transaction read time, and the data is cleared before it is read. But my team leader told me that the tiDB manufacturer was also doing a large amount of initialization data, so my process was directly killed during GC. Let me manually migrate where batch by batch (I think this method is too stupid), so I haven’t made adjustments here yet… (Updates to come, stay tuned!)

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — I am a gorgeous line — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —

Update update, last time is not said to be a batch of manual running, my heart is refused, but the body is still very honest to search the corresponding scheme ha ha -. –

If you want to run in batches, you need to use limit, offset:

// select * from 'SQL'; // select * from 'SQL'; Mysql is dead in this regard. SELECT * FROM tableName LIMIT 500000000, 60000000; SQL > select 6000W from 50000W; SQL > select 6000W from 50000W; SELECT * FROM tableName WHERE id >= (SELECT id FROM tableName ORDER BY ID LIMIT 500000000, 1) LIMIT 60000000;Copy the code

Why is it 6000W? According to my previous failed data runs, errors were reported at around 6000W, and I used the first SQL before, which must have involved a deep paging bottleneck. The server also waited so long and did gc the unprocessed data.

Then I opened nine servers according to the second SQL, and each server ran 6000W increments according to LOTID. Now it is running well, excellent! Similarly, the logstash problem we mentioned above can also be solved. Using the second SQL to run, there will be no problem in the back.

Since mysqld’s LOT table is always read, we plan to use lostash to do simple incremental synchronization of data.

— — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — I am a gorgeous line — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — — —

The shorter than Transaction duration, the shorter than Transaction duration, the shorter than transaction Duration.

On second thought, there is a problem with mysql that causes mysql to consume more time as I read deeper. After discussing with the group leader, I thought it might be the index failure problem, but AFTER searching, I found that there was no SQL expression that caused the index failure.

The problem may lie in my subquery, I can look up the corresponding LOTID number outside and then write the value in SQL.

Among them, order by does another sorting for the detected data, which is also very resource-consuming. SELECT * FROM tableName WHERE LOTID >= 200000000 and LOTID < 300000000.

In addition, I have been thinking in my running is not single thread? I originally wrote the whole SQL in “QuerysQL”, but I saw other official templates that wrote the column and then turned on splitPk.

So I made another big change to JSON. I split the SQL statement into column, and wrote where and FROM separately, so that splitPk could capture the represented fields for data fragmentation and actually enable concurrent reading!

The final template

{
    "job": {
        "setting": {
            "speed": {
                "channel": 32,
                "batchSize":4096
            }
        },
        "content": [
            {
                "reader": {
                    "name": "mysqlreader",
                    "parameter": {
                        "splitPk": "LOTID",
                        "connection": [
                            {
                                "table": [
                                    "LOT"
                                ],
                                "jdbcUrl": [
                                    "jdbc:mysql://xxx:4000/xxx?useUnicode=true&characterEncoding=utf-8"
                                ]
                            }
                        ],
                        "where": "LOTID >= 200000000 and LOTID < 250000000",
                        "username": "xxx",
                        "password": "xxx",
                        "column": [
                            "LOTID",
                            "LOTNO",
                            "DATE_FORMAT(CREATEDDATE,'%Y-%m-%d %H:%i:%S') as CREATEDDATE"
                        ]
                    }
                },
                "writer": {
                    "name": "elasticsearchwriter",
                    "parameter": {
                        "endpoint": "http://xxx:9200",
                        "index": "lot",
                        "type":"_doc",
                        "settings": {"index" :{"number_of_shards": 3, "number_of_replicas": 1, "refresh_interval": "120s"}},
                        "discovery": false,
                        "dynamic": true,
                        "column": [
                          { "name": "LOTID", "type": "keyword"},
                          { "name": "LOTNO","type": "keyword" },
                          { "name": "CREATEDDATE","type": "date" }
                        ]
                    }
                }
            }
        ]
    }
}
Copy the code

I captured the datax test report, which ran 10W+ every 1s, estimated to run 300 million in an hour. So, problem solved.

Start time: 2021-03-29 10:19:05 End time: 2021-03-29 10:25:57 Total task time: 411s Average task traffic: 23.76MB/s Record write speed: 119267ReC /s Number of read records: 48899522 Number of read/write failures: 0Copy the code

To sum up, in the real production environment to do data migration, you can not rely on the configuration of a few web articles just start to write, because you do not know why his parameters are configured like this, maybe your business background and others are a little different, there will be problems.

The best method is from already official website check, only already know what each configuration field represents, ability completes the job truly, and master good knowledge.