The premise

Recently, the business system architecture is basically complete, but the data level is relatively weak, because the author’s current work is focused on building a small data platform. A higher priority task is to need nearly real-time synchronous business system data (including soft save, update, or delete) to another data source, a persistent need to be cleaned before data and build a relatively reasonable to facilitate subsequent business data statistics, the construction of the tag system extension functions such as data model. Based on the resources and capabilities of the current team, the use of Canal, the open source middleware of Alibaba, was investigated as a priority.

This article briefly describes how to quickly set up a set of Canal-related components.

About the Canal

Briefly describe the background and principles of Canal middleware.

Introduction to the

The following introduction and the principles of the next section are derived from the README of the Canal project:

The main purpose of Canal is to provide incremental data subscription and consumption based on MySQL database incremental log parsing. According to the correct pronunciation of Canal phonetic alphabet and “knock urine” close, rather than many people think Can Nal, “the author was developed for this little sister laughed at.” In the early days, because of the dual-machine room deployment in Hangzhou and the United States, Alibaba had the business demand of cross-machine room synchronization, which was mainly realized by obtaining incremental changes based on business triggers. Since 2010, businesses have gradually tried to parse database logs to obtain incremental changes for synchronization, resulting in a large number of incremental database subscription and consumption services.

Businesses based on incremental log subscription and consumption include:

  • Database mirroring.
  • Backup the database in real time.
  • Index building and real-time maintenance (split heterogeneous indexes, inverted indexes, etc.).
  • businessCacheThe refresh.
  • Incremental data processing with business logic.

How Canal works

Principle of MySQL active/standby replication:

  • MySQLtheMasterThe instance writes data changes to the binary log (binary log, where the records are called binary log eventsbinary log events, can be passedshow binlog eventsTo view)
  • MySQLtheSlaveInstance willmasterthebinary log eventsCopy to its relay log (relay log)
  • MySQLtheSlaveInstance replayrelay logTo reflect data changes to its own data

Here’s how Canal works:

  • CanalsimulationMySQL SlaveThe interaction protocol, pretending to beMySQL SlavetoMySQL Mastersenddumpagreement
  • MySQL MasterreceiveddumpRequest, start pushbinary logtoSlave(i.e.Canal)
  • Canalparsingbinary logObject (originallybyteFlow) and can be sent through the connector to the middleware such as the corresponding message queue

About the version and components of Canal

As of the time of writing this article (2020-03-05), Canal’s latest release was V1.1.5-alpha-1 (2019-10-09) and its latest official release was V1.1.4 (2019-09-02). Among them, v1.1.4 mainly added authentication, monitoring function, and do a series of performance optimization, this version of the integrated connector is Tcp, Kafka and RockerMQ. The RabbitMQ connector has been added in v1.1.5-alpha-1, but the RabbitMQ connector in v1.1.5-alpha-1 does not define the port number for connecting to RabbitMQ. This issue has been fixed in the master branch (see the CanalRabbitMQProducer commit record in the source code). In other words, the only built-in connectors available in V1.1.4 are Tcp, Kafka, and RockerMQ. If you want to try out RabbitMQ connectors, you can do so in one of two ways:

  • chooseV1.1.5 - alpha - 1Version, but cannot be modifiedRabbitMQtheportProperty, which defaults to5672.
  • Based on themasterBranch buildCanal.

Currently, the Canal project is active, but due to the stability of the function, the author recommends a stable version for the production environment. Currently, the V1.1.4 version is available. “In this example, the V1.1.4 version is used with the Kafka connector.” Canal consists of three core components:

  • canal-admin: Background management module, provides orientedWebUItheCanalManagement ability.
  • canal-adapter: adapter to add client data landing adaptation and startup functions, includingREST, log adapter, data synchronization (table to table synchronization) for relational databases,HBaseData synchronization,ESData synchronization and so on.
  • canal-deployer: publisher, where the core functions are, includingbinlogFunctions such as parsing, converting, and sending packets to connectors are provided by this module.

Generally, the Canal-Deployer component is required and the other two components are available as needed.

Deploy the required middleware

To build a working set of components, you need to deploy MySQL, Zookeeper, Kafka, and Canal middleware. The following is a brief analysis of the deployment process. The selected virtual machine system is CentOS7.

MySQL installation

For the sake of simplicity, choose yum source installation (the official link is https://dev.mysql.com/downloads/repo/yum) :

::: info mysql80-community-release-el7-3 ::: info mysql80-community-release-el7-3 ::: info mysql80-community-release-el7-3 ::: info mysql80-community-release-el7-3 ::: info mysql80-community-release-el7-3 ::: info mysql80-community-release-el7-3

Use the latest mysql8.x community version, download the RPM package applicable to CentOS7:

cd /data/mysql
wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm
// After the download is completesudo rpm -Uvh mysql80-community-release-el7-3.noarch.rpm
Copy the code

Mysql-related packages in the yum repository:

[root@localhost mysql]# yum repolist all | grep mysql
Mysql cluster 7.5-community/x86_64 mysql cluster 7.5 community disabledMysql-cluster-7.5-community-source mysql cluster 7.5 community-disabledMysql cluster 7.6-community/x86_64 mysql cluster 7.6 community disabledMysql-cluster-7.6-community-source mysql cluster 7.6 community-disabledMysql cluster 8.0 community/x86_64 mysql cluster 8.0 community disabledMysql -cluster-8.0-community-source mysql cluster 8.0 community-disabledmysql-connectors-community/x86_64 MySQL Connectors Community enabled: 141 mysql-connectors-community-source MySQL Connectors Community - disabled mysql-tools-community/x86_64 MySQL Tools Community enabled: 105 mysql-tools-community-source MySQL Tools Community - Sourc disabled mysql-tools-preview/x86_64 MySQL Tools Preview disabled mysql-tools-preview-source MySQL Tools Preview - Source disabled Mysql55 -community/x86_64 MySQL 5.5 Community Server disabledMysql55-community-source MySQL 5.5 community server-disabledMysql56 -community/x86_64 MySQL 5.6 Community Server disabledMysql56-community-source MySQL 5.6 community server-disabledMysql57 -community/x86_64 MySQL 5.7 community Server disabledMysql57-community-source MySQL 5.7 community server-disabledMysql80 -community/x86_64 MySQL 8.0 Community Server Enabled: 161Mysql80-community-source MySQL 8.0 community server-disabledCopy the code

[mysql80-community] : /etc/yum. Repos. d/ mysql-community-repo: /etc/yum. Repos. d/ mysql-community-repo: /etc/yum.

[mysql80-community]
Name = MySQL Community Server 8.0baseurl=http://repo.mysql.com/yum/mysql-8.0-community/el/7/$basearch/
enabled=1
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql Copy the code

MySQL > install MySQL server

sudo yum install mysql-community-server
Copy the code

This is a lengthy process because you need to download and install five RPM packages (or the combined package mysql-8.0.18-1.el7.x86_64.rpm-bundle.tar). If the network is poor, you can manually download it from the official website and install it:

// Download the following 5 RPM packages: common --> libs --> libs-compat --> client --> servermysql-community-common
mysql-community-libs
mysql-community-libs-compat
mysql-community-client
mysql-community-server  // Mandatory installationRPM -ivh mysql-community-common-8.0.18-1.el7.x86_64. RPM --force --nodeps RPM -ivh mysql-community-common-8.0.18-1.el7.x86_64RPM -ivh mysql-community-libs-8.0.18-1.el7.x86_64. RPM --force --nodeps RPM -ivh mysql-community-libs-8.0.18-1.el7.x86_64. RPM --force --nodepsRPM -ivh mysql-community-libs-compat-8.0.18-1.el7.x86_64. RPM --force --nodeps RPM -ivh mysql-community-libs-compat-8.0.18-1.el7.x86_64. RPM --force --nodepsRPM -ivh mysql-community-client-8.0.18-1.el7.x86_64. RPM --force --nodeps RPM -ivh mysql-community-client-8.0.18-1.el7.x86_64RPM -ivh mysql-community-server-8.0.18-1.el7.x86_64. RPM --force --nodeps RPM -ivh mysql-community-server-8.0.18-1.el7.x86_64Copy the code

MySQL -u root -p MySQL -u root -p

Mysqld stop mysqld stop mysqld stop mysqld stop mysqld stop mysqld stop mysqld stop mysqld stop mysqld stop mysqldservice mysqld start
// Check the temporary password cat /var/log/mysqld.log[root@localhost log]# cat /var/log/mysqld.log 
2020-03-02T06:03:53.996423Z 0 [System] [MY-013169] [Server] /usr/sbin/mysqld (mysqld 8.0.18) initializing of server in progress as process 22780
2020-03-02T06:03:57.321447Z 5 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: >kjYaXENK6li 2020-03-02T06:04:00.123845Z 0 [System] [MY-010116] [Server] /usr/sbin/mysqld (mysqld 8.0.18) starting as process 22834 // Use a temporary password to log in as user root[root@localhost log]# mysql -u root -p Copy the code

Next, do the following:

  • Modify therootUser password:ALTER USER 'root'@'localhost' IDENTIFIED BY 'QWqw12! @ ';(Note that the password must contain uppercase and lowercase letters, digits, and special characters.)
  • updaterootthehostTo switch the databaseuse mysql;, specifyhostfor%So that it can be accessed remotely by other serversUPDATE USER SET HOST = '%' WHERE USER = 'root';
  • give'root'@'%'User, all rights, goGRANT ALL PRIVILEGES ON *.* TO 'root'@'%';
  • changeroot'@'%User password verification rules so that it can be usedNavicatTools such as:ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12! @ ';

Once the operation is complete, you can use the root user to remotely access the MySQL services on this virtual machine. SHOW VARIABLES LIKE ‘%bin%’; :

Finally, run the following command in MySQL Shell to create a new user name canal and password QWqw12! @ new user for REPLICATION SLAVE and REPLICATION CLIENT

CREATE USER canal IDENTIFIED BY 'QWqw12! @ ';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12! @ ';Copy the code

Switch back to user root and create database test:

CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`;
Copy the code

Install the Zookeeper

Both the Canal and Kafka clusters rely on Zookeeper for service coordination. To facilitate management, the Zookeeper service or Zookeeper cluster are deployed independently. Here I use version 3.6.0 released on 2020-03-04:

midkr /data/zk
#Creating a data directory
midkr /data/zk/data
cd /data/zk
Wget HTTP: / / http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gzTar ZXVF - apache - they are - 3.6.0 - bin. Tar. GzCD apache - they are - 3.6.0 - bin/confcp zoo_sample.cfg zoo.cfg && vim zoo.cfg Copy the code

Set dataDir to /data/zk/data in the zoo. CFG file and start Zookeeper:

[root@localhost conf]# sh /data/zk/apache-zookeeper-3.6.0-bin /bin/zkserver. sh start/usr/bin/java
ZooKeeper JMX enabled by default
Using the config: / data/zk/apache - they are - 3.6.0 - bin/bin /.. /conf/zoo.cfgStarting zookeeper ... STARTED
Copy the code

Note that to start this version of the Zookeeper service, you must have JDK8+ installed locally, which you need to do yourself. The default port is 2181. After the port is successfully started, the log is as follows:

Install the Kafka

Kafka is a high-performance distributed message queue middleware whose deployment relies on Zookeeper. Here, I use the 2.4.0 package with Scala version 2.13:

mkdir /data/kafka
mkdir /data/kafka/data
Wget HTTP: / / http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.13-2.4.0.tgzThe tar - ZXVF kafka_2. 13-2.4.0. TGZCopy the code

Because after decompression/data/kafka/kafka_2. 13-2.4.0 / config/server. The configuration of the properties and the corresponding zookeeper. Connect = localhost: 2181 has been in line with the need, don’t need to modify, Log file directory log.dirs is /data/kafka/data. Then start the Kafka service:

Sh/data/kafka kafka_2. 13-2.4.0 / bin/kafka - server - start. Sh/data/kafka kafka_2. 13-2.4.0 / config/server propertiesCopy the code

This will stop the Kafka process once it exits the console. You can add the -daemon parameter to keep the Kafka process running in the background.

Sh/data/kafka kafka_2. 13-2.4.0 / bin/kafka - server - start. Sh - daemon/data/kafka/kafka_2. 13-2.4.0 / config/server propertiesCopy the code

Install and use Canal

Here we go, using Canal’s v1.1.4 stable release, just download the Deployer module:

mkdir /data/canal
cd /data/canal
#Note that Github is blocked in China, and the download speed is extremely slow. You can use other download tools to download it before uploading it to the server
Wget HTTP: / / https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gzThe tar - ZXVF canal. Deployer - 1.1.4. Tar. GzCopy the code

The decompressed directory is as follows:

-bin # O&M script-conf # configuration fileCanal_local. properties # canal Local configuration, generally do not need to moveProperties # Canal service configurationLogback. XML # logback log configurationMetrics # Metrics statistics configurationSpring # Spring - instance configuration, mainly related to binlog location calculation, some policy configuration, you can choose any of these configuration files in canal.propertiesExample # Instance configuration folder. It is generally assumed that each database corresponds to a separate instance configuration folderInstance. properties # Instance configuration, typically a configuration for a single database- lib # Service dependency packages- logs # Log file output directoryCopy the code

In development and test environments, it is recommended to change the log level of logback. XML to DEBUG to help locate problems. There are two configuration files to look at here: canal.properties and instance.properties. In the canal.properties file, you need to modify:

  • To get rid ofcanal.instance.parser.parallelThreadSize = 16This configuration itemannotationThis configuration item is dependent on the number of threads in the instance parser. If not configured, it will block or not resolve.
  • canal.serverModeThe configuration item is specified askafka, the optional values aretcp,kafkaandrocketmq(masterBranch or latestV1.1.5 - alpha - 1Version, you can chooserabbitmq), the default iskafka.
  • canal.mq.serversThe configuration must be specified asKafkaService or clusterBrokerOf, which is configured here as127.0.0.1:9092.

Canal.mq. Servers have different meanings in different canal.serverMode. In The RocketMQ mode, it is the NameServer list. In the RabbitMQ mode, it is the Host and Port of the RabbitMQ service

For other configuration items, see the links to the two official wikis below:

  • Canal-Kafka-RocketMQ-QuickStart
  • AdminGuide

Properties generally refers to the configuration of a database instance. The Canal schema supports a Canal service instance and handles binlog asynchronous parsing of multiple database instances. Properties Configuration items that need to be modified include:

  • canal.instance.mysql.slaveIdYou need to configure a andMasterNode servicesIDCompletely different value, here I configured for654321.
  • Configure the data source instance, including address, user, password, and target database:
    • canal.instance.master.address, which is specified here127.0.0.1:3306.
    • canal.instance.dbUsername, which is specified herecanal.
    • canal.instance.dbPassword, which is specified hereQWqw12! @.
    • newcanal.instance.defaultDatabaseName, which is specified heretest(need to be inMySQLTo create atestDatabase, see the previous process).
  • KafkaRelated configuration, static is used here for the time beingtopicAnd a singlepartition:
    • canal.mq.topic, which is specified heretest.That’s what we’re done withbinlogStructured data is sent toKafkaThe namedtestthetopicIn the.
    • canal.mq.partition, which is specified here0.

Once the configuration is complete, the Canal service can be started:

sh /data/canal/bin/startup.sh 
#Viewing Service Logs
tail -100f /data/canal/logs/canal/canal
#Look at the instance log -- In general, focus on the instance log
tail -100f /data/canal/logs/example/example.log
Copy the code

After the startup is normal, the instance log is as follows:

Create an order table in the Test database and perform some simple DML:

use `test`;

CREATE TABLE `order`
(
    id          BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT 'primary key'. order_id VARCHAR(64) NOT NULL COMMENT 'order ID'. amount DECIMAL(10.2) NOT NULL DEFAULT 0 COMMENT 'Order amount'. create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Creation time'. UNIQUE uniq_order_id (`order_id`) ) COMMENT 'Order Form';  INSERT INTO `order`(order_id, amount) VALUES ('10086'.999); UPDATE `order` SET amount = 10087 WHERE order_id = '10086'; DELETE FROM `order` WHERE order_id = '10086'; Copy the code

In this case, we can use Kafka’s Kafka-console-Consumer or Kafka Tools to view the data for the test topic:

Sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer. Sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic  testCopy the code

The specific data are as follows:

// test Database creation script{"data":null,"database":"`test`","es":1583143732000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql ":"CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`","sqlType":null,"table":"","ts":1583143930177,"type":"QUERY"}
// Create table DDL for order table{"data":null,"database":"test","es":1583143957000,"id":2,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":" CREATE TABLE 'order' \n(\n ID BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT 'PRIMARY KEY ',\n order_id VARCHAR(64) NOT NULL COMMENT 'order ID',\n amount DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT 'CURRENT_TIMESTAMP ',\n create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT' create time ',\n UNIQUE Uniq_order_id (' order_id ')\n) COMMENT ","sqlType":null,"table":"order","ts":1583143958045,"type":"CREATE"} // INSERT Binlog event{" data ": [{" id" : "1", "order_id" : "10086", "amount" : "999.0", "create_time" : "2020-03-02 05:12:49"}],"database":"test","es":1583143969000,"id":3,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64) ", "amount" : "a DECIMAL (10, 2)", "create_time" : "DATETIME"}, "old", null, "pkNames" : (" id "), "SQL" : ""," sqlType ": {" id" : - 5, "order_id" : 12,"amount":3,"create_time":93},"table":"order","ts":1583143969460,"type":"INSERT"} // UPDATE Binlog event{" data ": [{" id" : "1", "order_id" : "10086", "amount" : "10087.0", "create_time" : "2020-03-02 05:12:49"}],"database":"test","es":1583143974000,"id":4,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64) ", "amount" : "a DECIMAL (10, 2)", "create_time" : "DATETIME"}, "old" : [{" amount ":" 999.0 "}], "pkNames" : (" id "), "SQL" : ""," sqlType ": {" id ":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143974870,"type":"UPDATE"} // DELETE the Binlog event{" data ": [{" id" : "1", "order_id" : "10086", "amount" : "10087.0", "create_time" : "2020-03-02 05:12:49"}],"database":"test","es":1583143980000,"id":5,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64) ", "amount" : "a DECIMAL (10, 2)", "create_time" : "DATETIME"}, "old", null, "pkNames" : (" id "), "SQL" : ""," sqlType ": {" id" : - 5, "order_id" : 12,"amount":3,"create_time":93},"table":"order","ts":1583143981091,"type":"DELETE"}Copy the code

We can see that Kafka’s topic named test has written the corresponding structured binlog event data. We can write the consumer to listen to Kafka’s corresponding topic and then process the obtained data. Here sent data structure can consult Canal source (the current editor of time for the 2020-07-03 master branch) of com. Alibaba. Otter. Canal. Protocol. FlatMessage:

One note:

  • FlatMessage.dataIs the currentDMLNewly written data, andFlatMessage.oldIs the historical data before the new data is writtenUPDATEThe type ofDMLSpeaking,FlatMessage.dataandFlatMessage.oldThere is data.
  • FlatMessage.sqlTypetheMap.Entry#value()Student: In generaljava.sql.JDBCTypeThis enumeration has a consistent mapping and can be resolved to match each oneColumnProperties of theJDBCType, and then according to the need to convert into the appropriateJavaType is ok.
  • To improve transmission efficiency,CanalWhen sent to the message middleware, the message is merged, oneFlatMessageIt is possible to contain multiple different records of the same type of event, noteFlatMessage.dataisList<Map<String, String>>Type, such as for the same tableINSERTEvents that may be merged into the sameFlatMessageInstance,FlatMessage.dataContains two elements.
  • CanalSent to theFlatMessage“, usingFastJsonSerialization, we’ve seen a lot lately aboutFastJsonYou need to be mentally prepared for a version upgrade.

summary

Most used in this article to introduce how other middleware deployment, this problem illustrates the Canal side deployment itself is not complicated, its configuration file attributes item is more, but actually need to customize and change the configuration of the item is less, is also illustrates its operational cost and learning cost is not high.

At present, the author is responsible for architecture, part of operation and maintenance, and the construction of data center. Some time ago, I led the migration of the whole set of online services from UCloud to Aliyun, and applied cloud RDS MySQL. Meanwhile, I built a set of Canal HA cluster for subscriping to the data of core services. Falling into a continuously modeled data warehouse, some real-time cache calculations and updates are performed based on near real-time Binlog events, and some visual charts are generated to provide multi-dimensional ICONS for real-time monitoring of operational metrics with Metabase. In the process, a relatively large number of potholes were stepped. Parsing the Binlog events generated by Canal took a lot of time in the early stage. Considering the low development efficiency, the author spent some time writing a glue layer for parsing Binlog events, realizing the function of object mapping without perception, and releasing productivity. The next article will look at the implementation of the Binlog event parsing tool, and then share the problems encountered and solutions, including the Canal stop-start exception (which is quite large if you use cloud RDS MySQL) and the Binlog event sequence.

References:

  • A Quick Guide to Using the MySQL Yum Repository
  • Canal

(C-3-D E-A-20200306 R-A-20200709)

Throwable Digest (ID: Throwable Doge), the public account of Throwable Digest (ID: Throwable Doge), pushes the author’s original technical articles from time to time (never plagiarize or reprint) :

This article was typeset using MDNICE