Single table synchronization
Table structure
CREATE TABLE `sync_es.logstash_resource` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(100) NOT NULL COMMENT 'resource name',
`description` varchar(100) NOT NULL COMMENT 'resource description',
`create_time` bigint(20) NOT NULL COMMENT 'create_time',
`update_time` bigint(20) NOT NULL COMMENT 'update_time',
`delete_time` bigint(20) DEFAULT '0' COMMENT 'delete_time'.PRIMARY KEY (`id`),
KEY `update_time` (`update_time`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code
Logstash configuration
Input {JDBC {jdbc_driver_library => "/path-to-jar/mysql-connector-java-8.0.26.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/sync_es" jdbc_user => root jdbc_password => "123456" use_column_value => true tracking_column => "update_time" tracking_column_type => numeric record_last_run => true last_run_metadata_path => "latest_update_time.txt" statement => "SELECT * FROM logstash_resource where update_time >:sql_last_value;" schedule => "* * * * * *" } } filter { mutate { remove_field => ["@version", "@timestamp"] } } output { elasticsearch { document_id=> "%{id}" document_type => "doc" index => "logstash_resource" hosts => ["http://localhost:9200"] } stdout{ codec => rubydebug } }Copy the code
Logstash configuration details
The following items in the configuration are related to mysql link configuration.
Jdbc_driver_library => "/path-to-jar/mysql-connector-java-8.0.26.jar" jdbc_driver_class => "com.mysql.jdbc.driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/sync_es" jdbc_user => root jdbc_password => "123456"Copy the code
The following items indicate that update_time is used to record the last update time, and the content is stored in the last_run_metadatA_PATH. At each time of scheduling, the value recorded last time is replaced with SQL_last_value in statement.
use_column_value => true
tracking_column => "update_time"
tracking_column_type => numeric
record_last_run => true
last_run_metadata_path => "latest_update_time.txt"
Copy the code
The statement is an SQL statement, and the contents of the select statement are stored in elasticSearch.
statement => "SELECT * FROM logstash_resource where update_time >= :sql_last_value;"
Copy the code
The schedule statement is executed and the result is stored in the execution of ElasticSearch, which is executed once per second.
schedule => "* * * * * *"
Copy the code
Nested field synchronization
For example, if a user searches for a resource based on its role, the nested field function is required.
New table structure
CREATE TABLE `sync_es.logstash_resource_role` (
`id` int(10) unsigned NOT NULL AUTO_INCREMENT,
`user_id` int(11) NOT NULL,
`resource_id` int(11) NOT NULL,
`role_id` int(11) NOT NULL
PRIMARY KEY (`id`),
UNIQUE KEY `user_resource` (`user_id`,`resource_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Copy the code
Logstash configuration
Input {JDBC {jdbc_driver_library => "/path-to-jar/mysql-connector-java-8.0.26.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/sync_es" jdbc_user => root jdbc_password => "123456" clean_run => true use_column_value => true record_last_run => true tracking_column => "update_time" tracking_column_type => numeric last_run_metadata_path => "last_update_time.txt" schedule => "* * * * * *" statement => " SELECT logstash_resource.id AS id, logstash_resource.name AS name, logstash_resource.description AS description, logstash_resource.create_time AS create_time, logstash_resource.update_time AS update_time, logstash_resource.delete_time AS delete_time, logstash_resource_role.user_id AS user_id, logstash_resource_role.role_id AS role_id FROM logstash_resource LEFT JOIN logstash_resource_role ON logstash_resource.id = logstash_resource_role.resource_id WHERE logstash_resource.update_time >= :sql_last_value;" } } filter { aggregate { task_id => "%{id}" code => " map['id'] = event.get('id') map['name'] = event.get('name') map['description'] = event.get('description') map['create_time'] = event.get('create_time') map['update_time'] = event.get('update_time') map['delete_time'] = event.get('delete_time') map['user_role'] ||=[] if (event.get('user_id') ! = nil) map['user_role'].delete_if{|x| x['user_id'] == event.get('user_id')} map['user_role'] << { 'user_id' => event.get('user_id'), 'role_id' => event.get('role_id'), } end event.cancel() " push_previous_map_as_event => true timeout => 5 } } output { stdout { #codec => json_lines } Elasticsearch {hosts => ["127.0.0.1:9200"] index => "logSTASH_resource" document_id => "%{ID}"}}Copy the code
The result of statement execution can be as follows
+----+-------+-------------+-------------+-------------+-------------+---------+---------+
| id | name | description | create_time | update_time | delete_time | user_id | role_id |
+----+-------+-------------+-------------+-------------+-------------+---------+---------+
| 1 | name1 | description | 1630744743 | 1630748902 | 0 | 1 | 2 |
| 1 | name1 | description | 1630744743 | 1630748902 | 0 | 2 | 3 |
| 2 | name2 | description | 1630744744 | 1630748902 | 0 | 3 | 3 |
+----+-------+-------------+-------------+-------------+-------------+---------+---------+
Copy the code
The configuration of aggregrate is analyzed as follows:
push_previous_map_as_event => true
:aggregate
The plugin encounters a new one at a timeid
, will aggregate the previous resultsmap
As aevent
And stored in theelasticsearch
Go inside. And then for this newid
To create amap
.timeout =>5
: when you have5s
There is no newevent
, will be the result of the last aggregationmap
And stored in theelasticsearch
.- The original
event
Will not be processed because the end of the script is executedevent.cancel()
.
Before implementing the Logstash, you need to create the mapping as follows
curl -X PUT -H 'Content-Type: application/json' -d '
{
"mappings": {
"properties" : {
"user_role" : {
"type" : "nested",
"properties" : {
"user_id" : { "type" : "long" },
"role_id" : { "type" : "long" }
}
}
}
}
}' 'http://localhost:9200/logstash_resource'
Copy the code
For the statement instance, the two records stored in ElasticSearch are as follows:
[{"_index": "logstash_resource"."_type": "_doc"."_id": "2"."_score": 1."_source": {
"description": "description"."update_time": 1630748902."delete_time": 0."create_time": 1630744744."user_role": [{"user_id": 3."role_id": 3}]."name": "name2"."id": 2}}, {"_index": "logstash_resource"."_type": "_doc"."_id": "1"."_score": 1."_source": {
"description": "description"."update_time": 1630748902."delete_time": 0."create_time": 1630744743."user_role": [{"user_id": 2."role_id": 3}]."name": "name1"."id": 1}}]Copy the code
Logstash synchronization problem
- if
sql
The conditions of use are>
Let’s say it’s maximumupdate_time
is1630744741
And if there is a follow-upupdate_time
is1630744741
The data is inserted, the data is not insertedelasticsearch
Inside; ifsql
The conditions of use are> =
, so the next time it is executed, the data recordupdate_time
for1630744741
It will synchronize again. (theupdate_time
Is as
Level timestamps can be used to alleviate this problem to some extentms
Level timestamp) - Nested field data update required by
logstash_resource_role
Reflected in the tablelogstash_resource
Go inside, because you need toupdate_time
Updates to resource data and role data are triggered. - All code for this article is in the projectlogstash-sync-mysql-to-esIf you think it is useful, you can give it a thumbs-up
star
Under the project.
reference
[1] www.elastic.co/guide/en/lo…
[2] www.elastic.co/guide/en/lo…