The product search function in mall project has not done real-time data synchronization. Alibaba’s Canal can sync data from MySQL to Elasticsearch in real time. Today we are going to talk about canal’s use, hopefully to help you!

Introduction of canal

Canal is mainly used to parse incremental logs of the MySQL database and subscribe and consume incremental data. To put it simply, it can synchronize incremental data of MySQL in real time to data stores such as MySQL, Elasticsearch, and HBase.

Working principle of CANAL

Canal will simulate the interaction protocol between the MySQL primary and secondary libraries, so as to disguise itself as the MySQL secondary library, and then send the dump protocol to the MySQL primary library. After receiving the dump request, the MySQL primary library will push the binlog to Canal, and Canal will synchronize the data to other storage by parsing the binlog.

Canal to use

MySQL syncing data to Elasticsearch in real time

Component download

  • First we need to download the various components of Canalcanal-server,canal-adapter,canal-admin, download address:Github.com/alibaba/can…

  • Canal’s components have different uses, as described below:

    • Canal-server (canal-deploy) : enables you to directly listen to the MySQL binlog and pretend to be the MySQL slave library to receive data without processing it.
    • Canal-adapter: a canal-server client that obtains data from the canal-server and synchronizes data to storage such as MySQL, Elasticsearch, and HBase.
    • Canal-admin: provides o&M functions for canal, such as overall configuration management and node operation and maintenance (O&M), as well as a user-friendly WebUI to facilitate rapid and secure operations.
  • MySQL, Elasticsearch, and Canal have compatibility issues with different versions of MySQL, Elasticsearch, and Canal.

application port version
MySQL 3306 5.7
Elasticsearch 9200 7.6.2
Kibanba 5601 7.6.2
canal-server 11111 1.1.15
canal-adapter 8081 1.1.15
canal-admin 8089 1.1.15

MySQL configuration

  • Since Canal implements data synchronization by subscribing to MySQL’s binlog, we need to enable MySQL’s binlog write function and setbinlog-formatFor ROW mode, my configuration file is/mydata/mysql/conf/my.cnf, can be changed to the following content;
Set server_id to be unique on the same LAN
## Specifies the database name that does not need to be synchronized
Enable the binary log function
Set the size of memory used by binary logs (transactions)
# change the binary log format to use (mixed,statement,row)
## Clearing time of binary log expiration. The default value is 0, indicating that automatic clearing is not performed.
Skip all errors or specified types of errors encountered in master/slave replication to avoid replication interruption on slave.
Error 1032 is caused by data inconsistency between primary and secondary databases
  • After the configuration, restart MySQL. After the restart, run the following command to check whether binlog is enabled:
show variables like '%log_bin%'
| Variable_name                   | Value                               |
| log_bin                         | ON                                  |
| log_bin_basename                | /var/lib/mysql/mall-mysql-bin       |
| log_bin_index                   | /var/lib/mysql/mall-mysql-bin.index |
| log_bin_trust_function_creators | OFF                                 |
| log_bin_use_v1_row_events       | OFF                                 |
| sql_log_bin                     | ON                                  |
  • MySQL > select * from binlog;
show variables like 'binlog_format%';  
| Variable_name | Value |
| binlog_format | ROW   |
  • Next, you need to create an account with secondary library privileges to subscribe to binlogcanal:canal;
  • Create a database for testingcanal-test, and then create a list of itemsproduct, construct the statement as follows.
CREATE TABLE `product`  (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `sub_title` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
  `price` decimal(10.2) NULL DEFAULT NULL,
  `pic` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL.PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 2 CHARACTER SET = utf8 COLLATE = utf8_general_ci ROW_FORMAT = Dynamic;
Canal – server to use

  • We downloaded the zip packageCanal. Deployer - 1.1.5 - the SNAPSHOT. Tar. GzUpload it to a Linux server and unzip it to the specified directory/mydata/canal-server, you can run the following command to decompress the package:
  • After decompression, the directory structure is as follows.
├ ─ ─ bin │ ├ ─ ─ restart. Sh │ ├ ─ ─ startup. The bat │ ├ ─ ─ startup. Sh │ └ ─ ─ stop. Sh ├ ─ ─ the conf │ ├ ─ ─ canal_local. Properties │ ├ ─ ─ Canal. The properties │ └ ─ ─ example │ └ ─ ─ the instance. The properties ├ ─ ─ lib ├ ─ ─ logs │ ├ ─ ─ canal │ │ └ ─ ─ canal. The log │ └ ─ ─ example │ ├── ├─ ├─ ├─ ├─ ├─ ├─ ├─ ├─ ├─Copy the code
  • Modifying a Configuration Fileconf/example/instance.properties, modify database configurations as follows.
# need to synchronize data MySQL address canal. The instance. The master. The address = canal. The instance. The master. The journal. The name = Canal. The instance. The master. The position = canal. The instance. The master. The timestamp = canal. The instance. The master. The gtid = # is used to synchronize data database account Canal. Instance. DbUsername = # canal is used to synchronize data database password canal. Instance. DbPassword = canal # database connection code Canal. Instance. ConnectionCharset = utf-8 # need to subscribe to the binlog filtering table regular expression canal. The instance. The filter. The regex =. * \ \.. *Copy the code
  • usestartup.shThe script to startcanal-serverService;
sh bin/startup.sh
  • After the service is successfully started, run the following command to view service logs.
tail -f logs/canal/canal.log
The 2020-10-26 16:18:13. [the main] INFO 354 com. Alibaba. Otter. Canal. Deployer. CanalController -## start the server[]The 2020-10-26 16:18:19. [the main] INFO 978 com. Alibaba. Otter. Canal. Deployer. CanalStarter -## the canal server is running now ......
  • After instance logs are successfully started, run the following command to view instance logs:
tail -f logs/example/example.log 
The 2020-10-26 16:18:16. 056. [the main] INFO C.A.O.C.I.S pring. Support. Accomplished - Loading the properties file From class Path resource [canal.properties] 2020-10-26 16:18:16.061 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance. The properties] 16:18:18. 2020-10-26, 259 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstancefor1-2020-10-26 example 16:18:18. 282. [the main] WARN C.A.O.C anal. Parse. The inbound. Mysql. Dbsync. LogEventConvert - > init table filter : ^.*\.. * $2020-10-26 16:18:18. 282. [the main] WARN C.A.O.C anal. Parse. The inbound. Mysql. Dbsync. LogEventConvert - > init table, black filter : ^mysql\.slave_.*$2020-10-26 16:18:19.543 [destination = example, address = /, EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long timeforReset the or first position the 2020-10-26 16:18:19. 578. [the main] INFO C.A.O tter. Canal. The instance. The core. AbstractCanalInstance - start successful.... [destination = example, address = /, EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just last position {"identity": {"slaveId":-1,"sourceAddress": {"address":"localhost"."port": 3306}}."postion": {"gtid":""."included":false."journalName":"mall-mysql-bin.000006"."position": 2271,"serverId": 101,"timestamp"[destination = example, address = /, EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mall-mysql-bin.000006,position=2271,serverId=101,gtid=,timestamp=1603682664000] cost : 2768ms , the next step is binlog dump
  • If you want to stopcanal-serverThe service can use the following command.
sh bin/stop.sh
Canal – adapter to use

  • We downloaded the zip packageCanal. Adapter - 1.1.5 - the SNAPSHOT. Tar. GzUpload it to a Linux server and unzip it to the specified directory/mydata/canal-adpterAfter decompression, the directory structure is as follows.
├ ─ ─ bin │ ├ ─ ─ adapter. The pid │ ├ ─ ─ restart. Sh │ ├ ─ ─ startup. The bat │ ├ ─ ─ startup. Sh │ └ ─ ─ stop. Sh ├ ─ ─ the conf │ ├ ─ ─ Application. Yml │ ├ ─ ─ es6 │ ├ ─ ─ es7 │ │ ├ ─ ─ biz_order. Yml │ │ ├ ─ ─ customer. Yml │ │ └ ─ ─ product. Yml │ ├ ─ ─ hbase │ ├ ─ ─ Kudu │ ├ ─ ─ logback. XML │ ├ ─ ─ meta-inf │ │ └ ─ ─ spring. Factories │ └ ─ ─ RDB ├ ─ ─ lib ├ ─ ─ logs │ └ ─ ─ adapter │ └ ─ ─ Adapter. The log └ ─ ─ the pluginCopy the code
  • Modifying a Configuration Fileconf/application.yml, modify the canal-server configuration, data source configuration and client adapter configuration as follows.
  mode: tcp TCP Kafka rocketMQ is optional
  flatMessage: true Flat message switch, whether to send data as A JSON string, only works in Kafka /rocketMQ mode
  zookeeperHosts:    Zk address in cluster mode
  syncBatchSize: 1000 # Number of batches per synchronization
  retries: 0 # number of retries. -1 indicates unlimited retries
  timeout: # Synchronization timeout in milliseconds
    # canal tcp consumer
    canal.tcp.server.host: 127.0. 01.: 11111 Set the canal-server address
    canal.tcp.batch.size: 500

  srcDataSources: Source database configuration
      url: JDBC: mysql: / / / canal_test? useUnicode=true
      username: canal
      password: canal
  canalAdapters: Adapter list
  - instance: example # canal instance name or MQ topic name
    groups: # Group list
    - groupId: g1 Group ID, which will be used in MQ mode
      - name: logger Log print adapter
      - name: es7 # ES synchronization adapter
        hosts: 127.0. 01.: 9200 # ES connection address
          mode: rest Transport (9300) or REST (9200)
          # security.auth: test:123456 # only used for rest mode
          cluster.name: elasticsearch # ES Cluster name
  • Adding a Configuration Filecanal-adapter/conf/es7/product.ymlIs used to configure the mapping between the MySQL table and the Elasticsearch index.
dataSourceKey: defaultDS The key of the source data source corresponds to the value in srcDataSources configured above
destination: example  # canal instance or MQ topic
groupId: g1 # for groupId in MQ mode, only data of groupId will be synchronized
  _index: canal_product # es index name
  _id: _id  If this parameter is not specified, pk _id will be automatically allocated by es
  sql: "SELECT p.id AS _id, p.title, p.sub_title, p.price, p.pic FROM product p"        # SQL mapping
  etlCondition: "where a.c_time>={}"   # etL conditional parameter
  commitBatch: 3000   Commit batch size
  • usestartup.shThe script to startcanal-adapterService;
sh bin/startup.sh
  • After the service is successfully started, run the following command to view service logs.
tail -f logs/adapter/adapter.log
16:52:55 20-10-26. 148. [the main] INFO C.A.O.C anal. Adapter. The launcher. Loader. CanalAdapterLoader - Load canal adapter: Logger succeed 16:52:57 2020-10-26. 005. [the main] INFO C.A.O.C.C lient. Adapter. Es. Core. Config. ESSyncConfigLoader - # # Start loading es mapping config ... The 2020-10-26 16:52:57. 376. [the main] INFO C.A.O.C.C lient. Adapter. Es. Core. Config. ESSyncConfigLoader - # # es mapping config The loaded the 2020-10-26 16:52:58. 615. [the main] INFO C.A.O.C anal. Adapter. The launcher. Loader. CanalAdapterLoader - Load canal adapter: Es7 succeed 16:52:58 2020-10-26. 651. [the main] INFO c.a. libaba. Otter. Canal. The core. The spi. ExtensionLoader - the extension classpath dir: / mydata/canal - adapter/plugin 16:52:59. 2020-10-26, 043 [main] INFO C.A.O.C anal. Adapter. The launcher. Loader. CanalAdapterLoader - Start adapter for canal-client mq topic: Example - g1 succeed the 2020-10-26 16:52:59. [the main] 044 INFO C.A.O.C anal. Adapter. The launcher. Loader. CanalAdapterService - # # the canal client adapters are running now ...... The 2020-10-26 16:52:59. 057 / Thread - 4 INFO C.A.O tter. Canal. Adapter. The launcher. Loader. AdapterProcessor - = = = = = = = = = = = = = > Start to connect destination: Example < = = = = = = = = = = = = = 2020-10-26 16:52:59. [the main] 100 INFO org. Apache. Coyote. Http11. Http11NioProtocol - Starting ProtocolHandler [HTTP - nio - 8081 ""] 16:52:59. 2020-10-26, 153 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using A shared selector for servlet write/read 2020-10-26 16:52:59.590 [main] INFO o.s.boot.web.embedded.tomcat.TomcatWebServer - Tomcat started on port(s): 8081 (HTTP) with context path '' 2020-10-26 16:52:59.626 [main] INFO C.A.O tter. Canal. Adapter. The launcher. CanalAdapterApplication - Started CanalAdapterApplication in 31.278 seconds (JVM [Thread running 33.99) for the 2020-10-26 16:52:59. 930-4] INFO C.A.O tter. Canal. Adapter. The launcher. The loader. AdapterProcessor - =============> Subscribe destination: example succeed <=============Copy the code
  • If you need to stopcanal-adapterThe service can use the following command.
sh bin/stop.sh
Data synchronization Demo

After the above steps, Canal’s data synchronization function is almost available. Let’s demonstrate the data synchronization function.

  • Select * from Kibana (select * from Kibana); select * from Kibana (select * from Kibana)Dev ToolsUse the following command to create the.
PUT canal_product
  "mappings": {
    "properties": {
      "title": {
        "type": "text"
      "sub_title": {
        "type": "text"
      "pic": {
        "type": "text"
      "price": {
        "type": "double"
  • After the index is created, you can view the index structure.
GET canal_product/_mapping
  • Then create a record in the database using the following SQL statement;
INSERT INTO product ( id, title, sub_title, price, pic ) VALUES ( 5.'millet 8'.'Full Screen Gaming smartphone 6GB+64GB'.1999.00.NULL );
  • After the creation, you can search for Elasticsearch and find that the data has been synchronized.
GET canal_product/_search
  • Then use the following SQL to modify the data;
UPDATE product SET title='millet 10' WHERE id=5
  • After the data is modified, you can search for the Elasticsearch file and find that the data has been modified.

  • Then use the following SQL to delete the data;
DELETE FROM product WHERE id=5
  • MySQL > select * from Elasticsearch; select * from Elasticsearch;

Canal – use the admin

  • We downloaded the zip packageCanal. Admin - 1.1.5 - the SNAPSHOT. Tar. GzUpload it to a Linux server and unzip it to the specified directory/mydata/canal-adminAfter decompression, the directory structure is as follows.
├ ─ ─ bin │ ├ ─ ─ restart. Sh │ ├ ─ ─ startup. The bat │ ├ ─ ─ startup. Sh │ └ ─ ─ stop. Sh ├ ─ ─ the conf │ ├ ─ ─ application. Yml │ ├ ─ ─ Canal_manager. SQL │ ├ ─ ─ canal - the template. The properties │ ├ ─ ─ the instance - the template. The properties │ ├ ─ ─ logback. XML │ └ ─ ─ public │ ├ ─ ─ avatar. GIF │ ├ ─ ─ index. The HTML │ ├ ─ ─ logo. The PNG │ └ ─ ─ the static ├ ─ ─ lib └ ─ ─ logsCopy the code
  • Create a database for canal-admin to usecanal_managerTo create the SQL script/mydata/canal-admin/conf/canal_manager.sql, the following table is created;

  • Modifying a Configuration Fileconf/application.yml, modify the data source configuration andcanal-adminNote that you need to use a database account with read and write permission, such as the administrative accountroot:root;
  port: 8089
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

  address: 127.0. 01.: 3306
  database: canal_manager
  username: root
  password: root
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}? useUnicode=true&characterEncoding=UTF-8&useSSL=false
    maximum-pool-size: 30
    minimum-idle: 1

  adminUser: admin
  adminPasswd: admin
  • I’m going to go ahead and do thatcanal-servertheconf/canal_local.propertiesFiles for configuration, mainly modificationcanal-adminTo be used after the modification is completesh bin/startup.sh localrestartcanal-server:
Register IP canal.register. IP = # canal admin config canal.admin.manager = canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster =Copy the code
  • usestartup.shThe script to startcanal-adminService;
sh bin/startup.sh
Copy the code
  • After the service is successfully started, run the following command to view service logs.
tail -f logs/admin.log
Copy the code
The 020-10-27 10:15:04. 210. [the main] INFO org. Apache. Coyote. Http11. Http11NioProtocol - Starting ProtocolHandler [HTTP - nio - 8089 ""] 10:15:04. 2020-10-27, 308 [main] INFO org.apache.tomcat.util.net.NioSelectorPool - Using a Shared The selector for the servlet write/read the 2020-10-27 10:15:04. 534. [the main] INFO O.S.B oot. Web. Embedded. Tomcat. TomcatWebServer - Tomcat started on port(s): 8089 (HTTP) with the context path '10:15:04. 2020-10-27 573 [main] INFO com. Alibaba. Otter. Canal. Admin. CanalAdminApplication - Started CanalAdminApplication in 31.203 seconds (JVM running for 34.865)Copy the code
  • Access the canal-admin Web UI and enter the account passwordadmin:123456You can log in and access the address:

  • After successful login, you can use the Web UI to operate the Canal-server.

