An overview of the

In the generation business, there is often a need to synchronize MySQL data to ES. If a high degree of customization is required, synchronization programs are often developed to process data. But without a special business requirement, the official Logstash has the advantage. When using logstash, we should first understand its characteristics and then decide whether to use it:

  • No need to develop, just install and configure logstash;
  • Logstash can be implemented with SQL (this is to query data through SQL)
  • Supports full synchronization each time or incremental synchronization based on specific fields (such as increment ID and change time).
  • The synchronization frequency is controllable, the fastest synchronization frequency is once per minute (if the requirements for effectiveness are high, use with caution);
  • Data that is physically deleted cannot be synchronized to data in ES that is physically deleted (you can add the logical delete field IsDelete in the table design to indicate that the data is deleted).

1, install,

To download logstash website, download address www.elastic.co/downloads/l… , the zip package is about 160M (if you want to download it from zxiaofan, please download it from CSDN); G:\ELK\logstash-6.5.4; Linux/tomcat/logstash / 】 logstash – 6.5.4. The following uses Program Directory to indicate the installation directory of different environments.

2, configuration,

2.1. Create a directory to store the configuration file and mysql dependency package

Jar = mysql-connector-java-5.1.34. Jar = mysql-connector-java-5.1.34. Jar = mysql-connector-java-5.1.34. Create a jdbc.conf file in the [program directory] \mysql directory. This file will configure core information such as database connection information, query data SQL, paging information, and synchronization frequency. For details, see the comments.

2.2. Single-table synchronization configuration

Input {stdin {{} JDBC type = > "JDBC" # database connection address jdbc_connection_string = > "JDBC: mysql: / / 192.168.1.1:3306 / TestDB? CharacterEncoding = utF-8 &autoReconnect=true"" Jdbc_user => "username" jdbc_password => "PWD" Jdbc_driver_library => "mysql/mysql-connector-java-5.1.34.jar" # the name of the driver class for mysql Jdbc_driver_class => "com.mysql.jdbc.driver" # number of database reconnection attempts connection_retry_attempts => "3" Jdbc_validate_connection => "true" Default 3600S jDBC_VALIDATION_TIMEOUT => "3600" # Enable paging query (default false disabled); Jdbc_paging_enabled => "true" # Query the number of entries in a single page (default 100000); Jdbc_page_size => "500" # statement_filepath => "500" # statement_filepath => "500" # statement_filepath => # sql_last_value = 'tracking_column'; # sql_last_value = 'ModifyTime'; # statement_filepath => "mysql/jdbc.sql" statement => "SELECT KeyId,TradeTime,OrderUserName,ModifyTime FROM `DetailTab` WHERE ModifyTime>= :sql_last_value order by ModifyTime ASC" # lowercase_column_names = > false Value can be any of: fatal, error, warn, info, debug, info by default; Sql_log_level => WARN # # Whether to record the last execution result. True indicates that the tracking_column value of the last execution result will be saved to the file specified by last_run_metadatA_PATH. Record_last_run => true # Tracking_column = timestamp (last_run => true); Use_column_value => true # Tracking_column => "ModifyTime" # Value can be any of: Numeric,timestamp, Default value is "numeric" tracking_column_type => timestamp # record_last_run Specifies the location where the last data was stored; Last_run_metadata_path => "mysql/last_id. TXT" Clean_run => false # # Sync frequency (by day/month/year), once per minute by default; Schedule => "* * * * *"} filter {json {source => "message" remove_field => ["message"]} # convert Change the field TotalMoney data type to float; Mutate {convert => {"TotalMoney" => "float"}}} output {elasticSearch {# host => "192.168.1.1" # port => "9200" # Configure ES cluster address hosts => ["192.168.1.1:9200", "192.168.1.2:9200", "192.168.1.3:9200"] Document_id => "%{KeyID}"} stdout {codec => json_lines}}Copy the code

2.3. Multiple table synchronization

The difference between multi-table configuration and single-table configuration is that the JDBC module of the input module has several types, and the OUTPUT module should have several types.

Input {stdin {} JDBC {# When multiple tables are synchronized, the table type is different. The recommended name is "database name _ table name". Each JDBC module must correspond to a type. Type => "TestDB_DetailTab" #... # record_last_run Specifies the location where the data was stored last time. Last_run_metadata_path => "mysql\last_id. TXT "last_run_metadata_path => "mysql\last_id. Clean_run => false # # Sync frequency (by day/month/year), once per minute by default; Schedule => "* * * * *"} JDBC {# When multiple tables are synchronized, the table type is different. The recommended name is "database name _ table name". Each JDBC module must correspond to a type. Type => "TestDB_Tab2" # Select last_run_metadATA_path from last_run_metadatA_path from last_run_metadatA_path from last_run_metadatA_path from last_run_metadatA_path from last_run_metadata_path. # other configuration omitted #... #... }} filter {json {source => "message" remove_field => ["message"]}} output {# Output module type must be the same as JDBC module type if [type] == "TestDB_DetailTab" {elasticSearch {# host => "192.168.1.1" # port => "9200" # configure ES cluster address hosts => [" 192.168.1.1:9200 192.168.1.2 instead: ", "9200", "192.168.1.3:9200"] # index name, Document_id => "%{KeyID}"}} if [type] == "TestDB_Tab2" { Elasticsearch {# host => "192.168.1.1" # port => "9200" "192.168.1.3:9200"] Document_id => "%{KeyID}"}} stdout {codec => json_lines}}Copy the code

3. Start and run

Run the following command in the program directory to start:

Bat -f mysql\jdbc.conf [Linux] nohup./bin/logstash -f mysql/jdbc_jx_moretable.conf &Copy the code

You can create a script to configure the startup command and run it directly. Run logs will be found in the \logs directory.

Note: the 5.x/ 6.x /7.x version requires JDk8 support. If the default JDK version is not JDK8, add two environment variables at the beginning of the logstash or logstash.lib.sh line:

Export JAVA_CMD = "/ usr/tools/jdk1.8.0 _162 / bin" export JAVA_HOME = "/ usr/tools/jdk1.8.0 _162 /"Copy the code

Startup:

  • Windows startup:
    • Solution 1: Use the Windows task plan.
    • Scheme 2: NSSM register Windows services, blog.csdn.net/u010887744/…
  • Linux startup:
    • CentOS 7 will join the system boot Linux service systemd service, blog.csdn.net/u010887744/…

4. Problems and solutions

4.1 After data synchronization, ES has no data

The index of the output. elasticSearch module must be all lowercase;

4.2. After incremental synchronization, the content of the last_run_metadatA_PATH file does not change

If lowercase_column_NAMES is not set to false, the tracking_column field must be set to all lowercase.

4.3. Jdbc_driver_library cannot be found

2032 com.mysql.jdbc.Driver not loaded.
Are you sure you've included the correct jdbc driver in :jdbc_driver_library?
Copy the code

Check whether the configured address is correct. In Linux, note that the path separator is slash, not \.

4.4. Data loss

If the comparison field in the SQL statement is greater than >, data may be lost. If the value of last_run_metadatA_PATH is saved at 2019-01-30 20:45:30 and the value of the newly added data is updated at 2019-01-30 20:45:30, the value cannot be synchronized. Solution: Use comparison fields that are greater than or equal to “>=”.

4.5 Repeated data updates

The solution to the previous problem, “data loss,” is to use “greater than or equal” for comparison fields, but this creates a new problem. If the value of last_run_metadatA_PATH is 2019-01-30 20:45:30 and the maximum update time in the database is 2019-01-30 20:45:30, the value is updated repeatedly until the value with a longer update time is displayed. When the above special data is large and no new data is updated for a long time, a large amount of data will be repeatedly synchronized to ES. When will the above situation occur: ① comparison field is not “autoincrement”; ② Comparison fields are procedually-generated inserts. Solution:

  • ① The self-increment of the comparison field is guaranteed not to repeat or the repetition probability is very small (such as the use of self-increment ID or database timestamp), so that most exceptions can be avoided;
  • ② If there is a large number of data inserted by the program, the update time is the same, and there may be no data update for a long time, it can be considered to update a test data in the database periodically to avoid the maximum amount of data.

4.6, disaster

Logstash itself cannot be clustered, and the combination ELK that we often use is a variant of kafka clustering. (1) Use the taskmaster to push data to Kafaka, and Kafka synchronizes data to ES, but the taskmaster itself also needs disaster recovery, and needs to consider the problem of repeated push; (2) Add logstash to the daemon and monitor its running state with a third party. Specific how to choose, need to combine with their own application scenarios.

4.7 Mass data synchronization

Why is it slow? Logstash paging query uses temporary table paging. SQL treats the entire set query as a temporary table for each page, and then queries on the temporary table. This results in a full table scan of the main table for each paging query.

SELECT * FROM (SELECT * FROM `ImageCN1`
 WHERE ModifyTime>= '1970-01-01 08:00:00'
 order by ModifyTime asc) AS `t1`
 LIMIT 5000 OFFSET 10000000;
Copy the code

If the amount of data is too large, how can I synchronize data safely for the first time? Consider adding paging conditions to the SQL statement, such as the ID range and the change time range, to reduce the amount of data to be synchronized. First, a small amount of data synchronization test verification, then according to the test conditions to modify the interval conditions to start the Logstash to complete synchronization. For example, change SQL to:

SELECT
	* 
FROM
	`ImageCN1` 
WHERE
	ModifyTime < '2018-10-10 10:10:10' AND ModifyTime >= '1970-01-01 08:00:00' 
ORDER BY
	ModifyTime ASC
Copy the code

After synchronizing the data in ModifyTime<‘2018-10-10 10:10:10’ interval, modify the data in the remaining interval of SQL synchronization. In this case, SQL needs to be modified after each synchronization. Online operation is cumbersome. Can WE not modify SQL and ensure synchronization efficiency? SQL we can modify again:

SELECT
	* 
FROM
	`ImageCN1` 
WHERE
	ModifyTime >= '1970-01-01 08:00:00' 
ORDER BY
	ModifyTime ASC 
	LIMIT 100000
Copy the code

In this way, the data volume of each sub-query can be guaranteed not to exceed 10W. The actual test shows that the effect is obvious when the data volume is large.

[SQL]USE XXXDataDB; Affected row: 0 Time: 0.001s [SQL] SELECT * FROM (SELECT * FROM 'ImageCN1' WHERE ModifyTime >= '1970-01-01 08:00:00' ORDER BY ModifyTime ASC ) AS `t1` LIMIT 5000 OFFSET 900000; Affected row: 0 Time: 7.229s [SQL] SELECT * FROM (SELECT * FROM 'ImageCN1' WHERE ModifyTime >= '2018-07-18 19:35:10' ORDER BY ModifyTime ASC LIMIT 100000) AS 't1' LIMIT 5000 OFFSET 90000Copy the code

SQL > limit 10W > limit 10W > limit 10W > limit 10W > limit 10W > limit 10W > limit 10W


Good luck!

Life is all about choices!

In the future, you will be grateful for your hard work now!

【CSDN】 【GitHub】 【OSCHINA】 【The Denver nuggets】 【Wechat official account】