1. Synchronize data
Elasticsearch often works with relational databases to provide search capabilities for businesses, while Logstash middleware is officially available for data synchronization.
The Elasticsearch relational database must meet the following conditions:
- In the Elasticsearch_id fieldIt must be set to the “ID” field in MySQL to ensure the mapping between ES middleware and relational databases.
- If MySQL updates some data, these operations cause data synchronization. However, this synchronization operation overwrites the entire ES
document
, rather than some specific field.
- If MySQL updates some data, these operations cause data synchronization. However, this synchronization operation overwrites the entire ES
- This parameter is recommended when inserting or updating data into MySQL
Update - time field
Identifies the update so that it can be captured.
Related configurations are as follows:
- MySQL: 8.0.24
- Elasticsearch: 7.15.2
- Logstash: 7.15.2: Provides ElasticSearch plugin for pushing data into ES
2. The deletion operation cannot be synchronized
When Logstash uses ES to push MySQL data, only insert and update operations are synchronized, but delete operations are not propagated to the Elasticsearch database.
Solution:
- Process by soft deletion. Soft drop tables generally contain
Is_deleted field
, you can filter this field to exclude related documents during query. This data is then removed from MySQL and Elasticsearch via a scheduled task. - Deleted by service processing. When deleting, set the transaction to propagate the delete to MySQL and Elasticsearch simultaneously. However, transaction errors can result in data inconsistencies.
For soft deletion, you need to configure the logstash. Conf file, for example:
Input {JDBC {# jdbc_driver_library => "mysql-connector-java-8.0.27.jar" type => "JDBC" jdbC_connection_string => "jdbc:mysql://cos-mysql:3306/cosimcloud? characterEncoding=UTF-8&autoReconnect=true" jdbc_user => "root" jdbc_password => "xxx" jdbc_driver_class => "Java::com.mysql.cj.jdbc.Driver" statement => "SELECT fid AS id, fis_deleted as deleted FROM t1" connection_retry_attempts => "3" jdbc_validate_connection => "true" Jdbc_validation_timeout => "600" jdbc_paging_enabled => "true" jdbc_page_size => "5000" # Set scheduled task schedule => "* * * * *" Tracking_column => "fupdated_at" tracking_column_type => "timestamp" lowercase_column_names => false}} filter { if [deleted] { mutate { add_field => { "[@metadata][elasticsearch_action]" => "delete" } } mutate { remove_field => [ "deleted","@version","@timestamp" ] } } else { mutate { add_field => { "[@metadata][elasticsearch_action]" => "index" } } mutate { remove_field => [ "deleted","@version","@timestamp" ] } } } output { elasticsearch { hosts => "es01:9200" user => "elastic" password => "elastic" ecs_compatibility => disabled manage_template => true template_overwrite => true template => "/usr/share/logstash/template/logstash-template.json" template_name => "logstash-mysql" index => "logstash-fmu" pipeline => "fmu-tag" action => "%{[@metadata][elasticsearch_action]}" document_id => "%{id}" } }Copy the code
To configure:
- In input-JdbC-Statement, the soft deletion field deleted needs to be queried
- In filter, determine deleted and use mutate to process the data
- The output of elasticsearch
The action field
- Template cannot generate index due to action configuration (pending)
Iii. Relevant documents
How do I use Logstash and JDBC to keep Elasticsearch in sync with a relational database