MySQL/Elasticsearch data mismatch resolved

The JDBC-input-plugin can only append to the database and increment write to elasticSearch. However, the JDBC source database may delete or update the database. In this way, the database and the database of the search engine are asymmetrical.

Of course, if you have a development team, you can write programs to synchronize search engine operations when deleting or updating. If you don’t have the ability, try the following methods.

There is a data table with the article, mtime field that defines ON UPDATE CURRENT_TIMESTAMP so the mtime will change every time you UPDATE it

mysql> desc article;
+-------------+--------------+------+-----+--------------------------------+-------+
| Field       | Type         | Null | Key | Default                        | Extra |
+-------------+--------------+------+-----+--------------------------------+-------+
| id          | int(11)      | NO   |     | 0                              |       |
| title       | mediumtext   | NO   |     | NULL                           |       |
| description | mediumtext   | YES  |     | NULL                           |       |
| author      | varchar(100) | YES  |     | NULL                           |       |
| source      | varchar(100) | YES  |     | NULL                           |       |
| content     | longtext     | YES  |     | NULL                           |       |
| status      | enum('Y','N')| NO   |     | 'N'                            |       |
| ctime       | timestamp    | NO   |     | CURRENT_TIMESTAMP              |       |
| mtime       | timestamp    | YES  |     | ON UPDATE CURRENT_TIMESTAMP    |       |
+-------------+--------------+------+-----+--------------------------------+-------+
7 rows in set (0.00 sec)Copy the code

Logstash Adds mtime query rules

jdbc { jdbc_driver_library => "/usr/share/java/mysql-connector-java.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_connection_string => "jdbc:mysql://localhost:3306/cms" jdbc_user => "cms" jdbc_password => "password" schedule => Statement => "select * from article where mtime > :sql_last_value" use_column_value => true tracking_column => "mtime" tracking_column_type => "timestamp" record_last_run => true last_run_metadata_path =>  "/var/tmp/article-mtime.last" }Copy the code

Create recycle bin table, this is used to resolve database deletion, or disable status = ‘N’.

CREATE TABLE `elasticsearch_trash` (
  `id` int(11) NOT NULL,
  `ctime` timestamp NULL DEFAULT CURRENT_TIMESTAMP,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8Copy the code

Create triggers for the Article table

CREATE DEFINER= 'dba' @ '%' TRIGGER 'article_BEFORE_UPDATE' BEFORE UPDATE ON 'article' FOR EACH ROW BEGIN -- the logic here is to resolve the article status change N, delete the corresponding data from the search engine. IF NEW.status = 'N' THEN insert into elasticsearch_trash(id) values(OLD.id); END IF; Select * from 'Y' for elasticSearch_TRASH; So you need to delete the recycling record in the recycle bin. IF NEW.status = 'Y' THEN delete from elasticsearch_trash where id = OLD.id; END IF; END CREATE DEFINER=`dba`@`%` TRIGGER `article_BEFORE_DELETE` BEFORE DELETE ON `article` FOR EACH ROW BEGIN -- If an article is deleted, it will be put into the search engine recycle bin. insert into elasticsearch_trash(id) values(OLD.id); ENDCopy the code

Next, we need to write a simple Shell that runs every minute to retrieve data from the ElasticSearch_TRASH table, and then use curl to call the elasticSearch restful interface to delete the retrieved data.

You can also develop related programs. Here is an example of a Spring Boot timed task.

entity

package cn.netkiller.api.domain.elasticsearch; import java.util.Date; import javax.persistence.Column; import javax.persistence.Entity; import javax.persistence.Id; import javax.persistence.Table; @Entity @Table public class ElasticsearchTrash { @Id private int id; @Column(columnDefinition = "TIMESTAMP DEFAULT CURRENT_TIMESTAMP") private Date ctime; public int getId() { return id; } public void setId(int id) { this.id = id; } public Date getCtime() { return ctime; } public void setCtime(Date ctime) { this.ctime = ctime; }}Copy the code

warehouse

package cn.netkiller.api.repository.elasticsearch;

import org.springframework.data.repository.CrudRepository;

import com.example.api.domain.elasticsearch.ElasticsearchTrash;

public interface ElasticsearchTrashRepository extends CrudRepository<ElasticsearchTrash, Integer>{


}
Copy the code

Timing task

package cn.netkiller.api.schedule; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.rest.RestStatus; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import com.example.api.domain.elasticsearch.ElasticsearchTrash; import com.example.api.repository.elasticsearch.ElasticsearchTrashRepository; @Component public class ScheduledTasks { private static final Logger logger = LoggerFactory.getLogger(ScheduledTasks.class); @Autowired private TransportClient client; @Autowired private ElasticsearchTrashRepository alasticsearchTrashRepository; Public ScheduledTasks() {} @scheduled (fixedRate = 1000 * 60) // Run a Scheduled task in 60 seconds public void cleanTrash() {for (ElasticsearchTrash elasticsearchTrash : alasticsearchTrashRepository.findAll()) { DeleteResponse response = client.prepareDelete("information", "article", elasticsearchTrash.getId() + "").get(); RestStatus status = response.status(); logger.info("delete {} {}", elasticsearchTrash.getId(), status.toString()); if (status == RestStatus.OK || status == RestStatus.NOT_FOUND) { alasticsearchTrashRepository.delete(elasticsearchTrash); }}}}Copy the code

Spring Boot starts the main program.

package cn.netkiller.api; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.scheduling.annotation.EnableScheduling; @SpringBootApplication @EnableScheduling public class Application { public static void main(String[] args) { SpringApplication.run(Application.class, args); }}Copy the code