This article describes how to synchronize mysql database information to ElasticSearch using canal increments. (Note: increment!!)

1. Introduction

1.1 canal introduce

Canal is a high performance data synchronization system based on MySQL binary logs. Canal is widely used by Alibaba Group (including www.taobao.com) to provide reliable low-latency incremental data pipeline, Github address: github.com/alibaba/can…

Canal Server can parse MySQL binlog and subscribe to data changes, while Canal Client can implement broadcasting changes anywhere, such as databases and Apache Kafka.

It has the following functions:

  1. Support for all platforms.
  2. Support for fine-grained system monitoring supported by Prometheus.
  3. Supports parsing and subscribing to MySQL binlog in different ways, such as GTID.
  4. Supports high performance and real-time data synchronization. (See Performance)
  5. The Canal Server and Canal Client both support HA/Scalability, which is supported by Apache ZooKeeper
  6. Docker support.

Disadvantages:

Full update is not supported, only incremental update is supported.

Full wiki: github.com/alibaba/can…

1.2 Operation Principle

The principle is simple:

  1. Canal simulates the interaction protocol of MySQL’s slaves, masquerading as MySQL slaves, and sends the forwarding protocol to the MySQL Master server.
  2. The MySQL Master receives the dump request and starts pushing binary logs to slave (canal).
  3. Canal parses the binary log object to its own data type (raw byte stream)

As shown in the figure:

1.3 synchronous es

An adapter is needed to synchronize data to es: the Canal Adapter. The latest version 1.1.3 can be downloaded at github.com/alibaba/can… .

Currently, es seems to support 6.x version, not 7.x version!!

2. Preparation

Es and JDK 2.1

Installation es can refer to: www.dalaoyang.cn/article/78

To install the JDK can refer to: www.dalaoyang.cn/article/16

2.2 Installing the Canal Server

Download canal. Deployer – 1.1.3. Tar. Gz

Wget HTTP: / / https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gzCopy the code

Unzip the files

The tar - ZXVF canal. Deployer - 1.1.3. Tar. GzCopy the code

The decompressed folder is displayed

cdCanal. Deployer - 1.1.3Copy the code

Modify the conf/example/instance. The properties files, mainly pay attention to the following:

  • Canal. The instance. The master. Address: database address, such as 127.0.0.1:3306
  • Canal. Instance. DbUsername: database users
  • Canal. Instance. DbPassword: database password

The full content is as follows:

# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
## mysql serverId, v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position infoCanal. The instance. The master. The address = 127.0.0.1:3306 canal. The instance. The master. The journal. The name = canal. The instance. The master. The position = canal.instance.master.timestamp= canal.instance.master.gtid=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=
#canal.instance.tsdb.dbUsername=
#canal.instance.tsdb.dbPassword=

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=12345678
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM 6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regexcanal.instance.filter.regex=.*\\.. *# table black regex
canal.instance.filter.black.regex=

# mq config
#canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\.. *. * \ \.. *
#canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\.. *
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #

Copy the code

Go back to the canal.deployer-1.1.3 directory and start canal:

sh bin/startup.sh
Copy the code

View logs:

vi logs/canal/canal.log
Copy the code

To view a specific instance log:

 vi logs/example/example.log
Copy the code

The shutdown command

sh bin/stop.sh
Copy the code

2.3 installation canal – adapter

Download canal. Adapter – 1.1.3. Tar. Gz

Wget HTTP: / / https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gzCopy the code

Unpack the

The tar - ZXVF canal. Adapter - 1.1.3. Tar. GzCopy the code

The decompressed folder is displayed

cdCanal. Adapter - 1.1.3Copy the code

To modify the conf/application.yml file, pay attention to the following. Since it is a YML file, pay attention to the attribute names I have specified here:

  • Server port: canal – adapter port number
  • Canal. Conf. CanalServerHost: canal – server and IP address
  • Canal. Conf. SrcDataSources. DefaultDS. Url: address database
  • Canal. Conf. SrcDataSources. DefaultDS. Username: database user name
  • Canal. Conf. SrcDataSources. DefaultDS. Password: password database
  • Canal. Conf. CanalAdapters. Groups. OuterAdapters. Hosts: es host address and TCP port

The full content is as follows:

server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: Conf: mode: TCP canalServerHost: 127.0.0.1:11111 batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: The accessKey: secretKey: srcDataSources: defaultDS: url: JDBC: mysql: / / 127.0.0.1:3306 /test? useUnicode=true
      username: root
      password: 12345678
  canalAdapters:
  - instance: example
    groups:
    - groupId: g1
      outerAdapters:
      - name: es
        hosts: 127.0.0.1:9300
        properties:
         cluster.name: elasticsearch
Copy the code

In addition, you need to configure the conf/es/*. Yml file. The Adapter automatically loads all configuration files ending in. Before introducing the configuration, you need to introduce the table structure used in this example as follows:

CREATE TABLE `test` (
  `id` int(11) NOT NULL,
  `name` varchar(200) NOT NULL,
  `address` varchar(1000) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Copy the code

You need to manually create an index in es, such as using es-head, as shown in the following figure:

The test index structure is as follows:

{
    "mappings": {"_doc": {"properties": {"name": {"type":"text"
                },
                "address": {"type":"text"
                }
            }
        }
    }
}
Copy the code

Test.yml = test.yml; test.yml = test.yml;

dataSourceKey: defaultDS
destination: example
groupId:
esMapping:
  _index: test
  _type: _doc
  _id: _id
  upsert: true
  sql: "select a.id as _id,a.name,a.address from test a"
  commitBatch: 3000
Copy the code

After the configuration is complete, go to the canal-Adapter root directory and run the command to start the adapter

bin/startup.sh
Copy the code

See the log

vi logs/adapter/adapter.log
Copy the code

Disable the canal-adapter command

bin/stop.sh
Copy the code

3. The test

After all the startup is successful, check the ES-head first, as shown in the figure, there is no data now.

Next, we insert a data into the database to test with the following statement:

INSERT INTO `test`. `test`(`id`, `name`, `address`) VALUES (7, 'Beijing'.'Chaoyang District, Beijing');
Copy the code

Then look at the ES-head, as follows

Now look at the log, as follows:

The 2019-06-22 17:54:15. 385 [pool - 2 - thread - 1] DEBUG C.A.O tter. Canal. Client. Adapter. Es. Service. ESSyncService - DML: {"data": [{"id": 7,"name":"Beijing"."address":Chaoyang District, Beijing}]."database":"test"."destination":"example"."es": 1561197255000,"groupId":null,"isDdl":false."old":null,"pkNames": ["id"]."sql":""."table":"test"."ts": 1561197255384,"type":"INSERT"} 
Affected indexes: test 
Copy the code

The syntax for viewing the last 200 lines of a log may not be very useful.

tail -200f logs/adapter/adapter.log
Copy the code

4. To summarize

1. Full updates cannot be implemented, but additions, deletions and changes are possible. 2. Be sure to create indexes in advance. 3. The ES is configured with the TCP port, for example, the default port 9300