This article describes how to synchronize mysql database information to ElasticSearch using canal increments. (Note: increment!!)
1. Introduction
1.1 canal introduce
Canal is a high performance data synchronization system based on MySQL binary logs. Canal is widely used by Alibaba Group (including www.taobao.com) to provide reliable low-latency incremental data pipeline, Github address: github.com/alibaba/can…
Canal Server can parse MySQL binlog and subscribe to data changes, while Canal Client can implement broadcasting changes anywhere, such as databases and Apache Kafka.
It has the following functions:
- Support for all platforms.
- Support for fine-grained system monitoring supported by Prometheus.
- Supports parsing and subscribing to MySQL binlog in different ways, such as GTID.
- Supports high performance and real-time data synchronization. (See Performance)
- The Canal Server and Canal Client both support HA/Scalability, which is supported by Apache ZooKeeper
- Docker support.
Disadvantages:
Full update is not supported, only incremental update is supported.
Full wiki: github.com/alibaba/can…
1.2 Operation Principle
The principle is simple:
- Canal simulates the interaction protocol of MySQL’s slaves, masquerading as MySQL slaves, and sends the forwarding protocol to the MySQL Master server.
- The MySQL Master receives the dump request and starts pushing binary logs to slave (canal).
- Canal parses the binary log object to its own data type (raw byte stream)
As shown in the figure:
1.3 synchronous es
An adapter is needed to synchronize data to es: the Canal Adapter. The latest version 1.1.3 can be downloaded at github.com/alibaba/can… .
Currently, es seems to support 6.x version, not 7.x version!!
2. Preparation
Es and JDK 2.1
Installation es can refer to: www.dalaoyang.cn/article/78
To install the JDK can refer to: www.dalaoyang.cn/article/16
2.2 Installing the Canal Server
Download canal. Deployer – 1.1.3. Tar. Gz
Wget HTTP: / / https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gzCopy the code
Unzip the files
The tar - ZXVF canal. Deployer - 1.1.3. Tar. GzCopy the code
The decompressed folder is displayed
cdCanal. Deployer - 1.1.3Copy the code
Modify the conf/example/instance. The properties files, mainly pay attention to the following:
- Canal. The instance. The master. Address: database address, such as 127.0.0.1:3306
- Canal. Instance. DbUsername: database users
- Canal. Instance. DbPassword: database password
The full content is as follows:
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
## mysql serverId, v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
# enable gtid use true/false
canal.instance.gtidon=false
# position infoCanal. The instance. The master. The address = 127.0.0.1:3306 canal. The instance. The master. The journal. The name = canal. The instance. The master. The position = canal.instance.master.timestamp= canal.instance.master.gtid=# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=
# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=
#canal.instance.tsdb.dbUsername=
#canal.instance.tsdb.dbPassword=
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=
# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=12345678
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM 6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
# table regexcanal.instance.filter.regex=.*\\.. *# table black regex
canal.instance.filter.black.regex=
# mq config
#canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\.. *. * \ \.. *
#canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\.. *
# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # #
Copy the code
Go back to the canal.deployer-1.1.3 directory and start canal:
sh bin/startup.sh
Copy the code
View logs:
vi logs/canal/canal.log
Copy the code
To view a specific instance log:
vi logs/example/example.log
Copy the code
The shutdown command
sh bin/stop.sh
Copy the code
2.3 installation canal – adapter
Download canal. Adapter – 1.1.3. Tar. Gz
Wget HTTP: / / https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gzCopy the code
Unpack the
The tar - ZXVF canal. Adapter - 1.1.3. Tar. GzCopy the code
The decompressed folder is displayed
cdCanal. Adapter - 1.1.3Copy the code
To modify the conf/application.yml file, pay attention to the following. Since it is a YML file, pay attention to the attribute names I have specified here:
- Server port: canal – adapter port number
- Canal. Conf. CanalServerHost: canal – server and IP address
- Canal. Conf. SrcDataSources. DefaultDS. Url: address database
- Canal. Conf. SrcDataSources. DefaultDS. Username: database user name
- Canal. Conf. SrcDataSources. DefaultDS. Password: password database
- Canal. Conf. CanalAdapters. Groups. OuterAdapters. Hosts: es host address and TCP port
The full content is as follows:
server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: Conf: mode: TCP canalServerHost: 127.0.0.1:11111 batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: The accessKey: secretKey: srcDataSources: defaultDS: url: JDBC: mysql: / / 127.0.0.1:3306 /test? useUnicode=true
username: root
password: 12345678
canalAdapters:
- instance: example
groups:
- groupId: g1
outerAdapters:
- name: es
hosts: 127.0.0.1:9300
properties:
cluster.name: elasticsearch
Copy the code
In addition, you need to configure the conf/es/*. Yml file. The Adapter automatically loads all configuration files ending in. Before introducing the configuration, you need to introduce the table structure used in this example as follows:
CREATE TABLE `test` (
`id` int(11) NOT NULL,
`name` varchar(200) NOT NULL,
`address` varchar(1000) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
Copy the code
You need to manually create an index in es, such as using es-head, as shown in the following figure:
The test index structure is as follows:
{
"mappings": {"_doc": {"properties": {"name": {"type":"text"
},
"address": {"type":"text"
}
}
}
}
}
Copy the code
Test.yml = test.yml; test.yml = test.yml;
dataSourceKey: defaultDS
destination: example
groupId:
esMapping:
_index: test
_type: _doc
_id: _id
upsert: true
sql: "select a.id as _id,a.name,a.address from test a"
commitBatch: 3000
Copy the code
After the configuration is complete, go to the canal-Adapter root directory and run the command to start the adapter
bin/startup.sh
Copy the code
See the log
vi logs/adapter/adapter.log
Copy the code
Disable the canal-adapter command
bin/stop.sh
Copy the code
3. The test
After all the startup is successful, check the ES-head first, as shown in the figure, there is no data now.
Next, we insert a data into the database to test with the following statement:
INSERT INTO `test`. `test`(`id`, `name`, `address`) VALUES (7, 'Beijing'.'Chaoyang District, Beijing');
Copy the code
Then look at the ES-head, as follows
Now look at the log, as follows:
The 2019-06-22 17:54:15. 385 [pool - 2 - thread - 1] DEBUG C.A.O tter. Canal. Client. Adapter. Es. Service. ESSyncService - DML: {"data": [{"id": 7,"name":"Beijing"."address":Chaoyang District, Beijing}]."database":"test"."destination":"example"."es": 1561197255000,"groupId":null,"isDdl":false."old":null,"pkNames": ["id"]."sql":""."table":"test"."ts": 1561197255384,"type":"INSERT"}
Affected indexes: test
Copy the code
The syntax for viewing the last 200 lines of a log may not be very useful.
tail -200f logs/adapter/adapter.log
Copy the code
4. To summarize
1. Full updates cannot be implemented, but additions, deletions and changes are possible. 2. Be sure to create indexes in advance. 3. The ES is configured with the TCP port, for example, the default port 9300