1. Synchronize data

Elasticsearch often works with relational databases to provide search capabilities for businesses, while Logstash middleware is officially available for data synchronization.

The Elasticsearch relational database must meet the following conditions:

  • In the Elasticsearch_id fieldIt must be set to the “ID” field in MySQL to ensure the mapping between ES middleware and relational databases.
    • If MySQL updates some data, these operations cause data synchronization. However, this synchronization operation overwrites the entire ESdocument, rather than some specific field.
  • This parameter is recommended when inserting or updating data into MySQLUpdate - time fieldIdentifies the update so that it can be captured.

Related configurations are as follows:

  • MySQL: 8.0.24
  • Elasticsearch: 7.15.2
  • Logstash: 7.15.2: Provides ElasticSearch plugin for pushing data into ES

2. The deletion operation cannot be synchronized

When Logstash uses ES to push MySQL data, only insert and update operations are synchronized, but delete operations are not propagated to the Elasticsearch database.

Solution:

  1. Process by soft deletion. Soft drop tables generally containIs_deleted field, you can filter this field to exclude related documents during query. This data is then removed from MySQL and Elasticsearch via a scheduled task.
  2. Deleted by service processing. When deleting, set the transaction to propagate the delete to MySQL and Elasticsearch simultaneously. However, transaction errors can result in data inconsistencies.

For soft deletion, you need to configure the logstash. Conf file, for example:

Input {JDBC {# jdbc_driver_library => "mysql-connector-java-8.0.27.jar" type => "JDBC" jdbC_connection_string => "jdbc:mysql://cos-mysql:3306/cosimcloud? characterEncoding=UTF-8&autoReconnect=true" jdbc_user => "root" jdbc_password => "xxx" jdbc_driver_class => "Java::com.mysql.cj.jdbc.Driver" statement => "SELECT fid AS id, fis_deleted as deleted FROM t1" connection_retry_attempts => "3" jdbc_validate_connection => "true" Jdbc_validation_timeout => "600" jdbc_paging_enabled => "true" jdbc_page_size => "5000" # Set scheduled task schedule => "* * * * *" Tracking_column => "fupdated_at" tracking_column_type => "timestamp" lowercase_column_names => false}} filter { if [deleted] { mutate { add_field => { "[@metadata][elasticsearch_action]" => "delete" } } mutate { remove_field => [ "deleted","@version","@timestamp" ] } } else { mutate { add_field => { "[@metadata][elasticsearch_action]" => "index" } } mutate { remove_field => [ "deleted","@version","@timestamp" ] } } } output { elasticsearch { hosts => "es01:9200" user => "elastic" password => "elastic" ecs_compatibility => disabled manage_template => true template_overwrite => true template => "/usr/share/logstash/template/logstash-template.json" template_name => "logstash-mysql" index => "logstash-fmu" pipeline => "fmu-tag" action => "%{[@metadata][elasticsearch_action]}" document_id => "%{id}" } }Copy the code

To configure:

  • In input-JdbC-Statement, the soft deletion field deleted needs to be queried
  • In filter, determine deleted and use mutate to process the data
  • The output of elasticsearchThe action field
    • Template cannot generate index due to action configuration (pending)

Iii. Relevant documents

How do I use Logstash and JDBC to keep Elasticsearch in sync with a relational database