Single table synchronization

Table structure

CREATE TABLE `sync_es.logstash_resource` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(100) NOT NULL COMMENT 'resource name',
  `description` varchar(100) NOT NULL COMMENT 'resource description',
  `create_time` bigint(20) NOT NULL COMMENT 'create_time',
  `update_time` bigint(20) NOT NULL COMMENT 'update_time',
  `delete_time` bigint(20) DEFAULT '0' COMMENT 'delete_time'.PRIMARY KEY (`id`),
  KEY `update_time` (`update_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code

Logstash configuration

Input {JDBC {jdbc_driver_library => "/path-to-jar/mysql-connector-java-8.0.26.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/sync_es" jdbc_user => root jdbc_password => "123456" use_column_value => true tracking_column => "update_time" tracking_column_type => numeric record_last_run =>  true last_run_metadata_path => "latest_update_time.txt" statement => "SELECT * FROM logstash_resource where update_time  >:sql_last_value;" schedule => "* * * * * *" } } filter { mutate { remove_field => ["@version", "@timestamp"] } } output { elasticsearch { document_id=> "%{id}" document_type => "doc" index => "logstash_resource" hosts => ["http://localhost:9200"] } stdout{ codec => rubydebug } }Copy the code

Logstash configuration details

The following items in the configuration are related to mysql link configuration.

Jdbc_driver_library => "/path-to-jar/mysql-connector-java-8.0.26.jar" jdbc_driver_class => "com.mysql.jdbc.driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/sync_es" jdbc_user => root jdbc_password => "123456"Copy the code

The following items indicate that update_time is used to record the last update time, and the content is stored in the last_run_metadatA_PATH. At each time of scheduling, the value recorded last time is replaced with SQL_last_value in statement.

use_column_value => true
tracking_column => "update_time"
tracking_column_type => numeric
record_last_run => true
last_run_metadata_path => "latest_update_time.txt"
Copy the code

The statement is an SQL statement, and the contents of the select statement are stored in elasticSearch.

statement => "SELECT * FROM logstash_resource where update_time >= :sql_last_value;"
Copy the code

The schedule statement is executed and the result is stored in the execution of ElasticSearch, which is executed once per second.

schedule => "* * * * * *"
Copy the code

Nested field synchronization

For example, if a user searches for a resource based on its role, the nested field function is required.

New table structure

CREATE TABLE `sync_es.logstash_resource_role` (
  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,
  `user_id` int(11) NOT NULL,
  `resource_id` int(11) NOT NULL,
  `role_id` int(11) NOT NULL
  PRIMARY KEY (`id`),
  UNIQUE KEY `user_resource` (`user_id`,`resource_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code

Logstash configuration

Input {JDBC {jdbc_driver_library => "/path-to-jar/mysql-connector-java-8.0.26.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/sync_es" jdbc_user => root jdbc_password => "123456" clean_run => true use_column_value => true record_last_run => true tracking_column => "update_time" tracking_column_type => numeric last_run_metadata_path => "last_update_time.txt" schedule => "* * * * * *" statement => " SELECT logstash_resource.id AS id, logstash_resource.name AS name, logstash_resource.description AS description, logstash_resource.create_time AS create_time, logstash_resource.update_time AS update_time, logstash_resource.delete_time AS delete_time, logstash_resource_role.user_id AS user_id, logstash_resource_role.role_id AS role_id FROM logstash_resource LEFT JOIN logstash_resource_role ON logstash_resource.id = logstash_resource_role.resource_id WHERE logstash_resource.update_time >= :sql_last_value;" } } filter { aggregate { task_id => "%{id}" code => " map['id'] = event.get('id') map['name'] = event.get('name') map['description'] = event.get('description') map['create_time'] = event.get('create_time') map['update_time'] = event.get('update_time') map['delete_time'] = event.get('delete_time') map['user_role'] ||=[] if (event.get('user_id') ! = nil) map['user_role'].delete_if{|x| x['user_id'] == event.get('user_id')} map['user_role'] << { 'user_id' => event.get('user_id'), 'role_id' => event.get('role_id'), } end event.cancel() " push_previous_map_as_event => true timeout => 5 } } output { stdout { #codec => json_lines } Elasticsearch {hosts => ["127.0.0.1:9200"] index => "logSTASH_resource" document_id => "%{ID}"}}Copy the code

The result of statement execution can be as follows

+----+-------+-------------+-------------+-------------+-------------+---------+---------+
| id | name  | description | create_time | update_time | delete_time | user_id | role_id |
+----+-------+-------------+-------------+-------------+-------------+---------+---------+
|  1 | name1 | description |  1630744743 |  1630748902 |           0 |       1 |       2 |
|  1 | name1 | description |  1630744743 |  1630748902 |           0 |       2 |       3 |
|  2 | name2 | description |  1630744744 |  1630748902 |           0 |       3 |       3 |
+----+-------+-------------+-------------+-------------+-------------+---------+---------+
Copy the code

The configuration of aggregrate is analyzed as follows:

  1. push_previous_map_as_event => true: aggregateThe plugin encounters a new one at a timeid, will aggregate the previous resultsmapAs aeventAnd stored in theelasticsearchGo inside. And then for this newidTo create amap.
  2. timeout =>5: when you have5sThere is no newevent, will be the result of the last aggregationmapAnd stored in theelasticsearch.
  3. The originaleventWill not be processed because the end of the script is executedevent.cancel().

Before implementing the Logstash, you need to create the mapping as follows

curl -X PUT -H 'Content-Type: application/json' -d '
{
    "mappings": {
         "properties" : {
             "user_role" : {
                 "type" : "nested",
                 "properties" : {
                     "user_id" : { "type" : "long" },
                     "role_id" : { "type" : "long" }
                 }
             }
         }
    }
}' 'http://localhost:9200/logstash_resource'
Copy the code

For the statement instance, the two records stored in ElasticSearch are as follows:

[{"_index": "logstash_resource"."_type": "_doc"."_id": "2"."_score": 1."_source": {
          "description": "description"."update_time": 1630748902."delete_time": 0."create_time": 1630744744."user_role": [{"user_id": 3."role_id": 3}]."name": "name2"."id": 2}}, {"_index": "logstash_resource"."_type": "_doc"."_id": "1"."_score": 1."_source": {
          "description": "description"."update_time": 1630748902."delete_time": 0."create_time": 1630744743."user_role": [{"user_id": 2."role_id": 3}]."name": "name1"."id": 1}}]Copy the code

Logstash synchronization problem

  1. ifsqlThe conditions of use are>Let’s say it’s maximumupdate_timeis1630744741And if there is a follow-upupdate_timeis1630744741The data is inserted, the data is not insertedelasticsearchInside; ifsqlThe conditions of use are> =, so the next time it is executed, the data recordupdate_timefor1630744741It will synchronize again. (theupdate_timeIs asLevel timestamps can be used to alleviate this problem to some extentmsLevel timestamp)
  2. Nested field data update required bylogstash_resource_roleReflected in the tablelogstash_resourceGo inside, because you need toupdate_timeUpdates to resource data and role data are triggered.
  3. All code for this article is in the projectlogstash-sync-mysql-to-esIf you think it is useful, you can give it a thumbs-upstarUnder the project.

reference

[1] www.elastic.co/guide/en/lo…

[2] www.elastic.co/guide/en/lo…