The premise
Recently, the business system architecture is basically complete, but the data level is relatively weak, because the author’s current work is focused on building a small data platform. A higher priority task is to need nearly real-time synchronous business system data (including soft save, update, or delete) to another data source, a persistent need to be cleaned before data and build a relatively reasonable to facilitate subsequent business data statistics, the construction of the tag system extension functions such as data model. Based on the resources and capabilities of the current team, the use of Canal, the open source middleware of Alibaba, was investigated as a priority.
This article briefly describes how to quickly set up a set of Canal-related components.
About the Canal
Briefly describe the background and principles of Canal middleware.
Introduction to the
The following introduction and the principles of the next section are derived from the README of the Canal project:
The main purpose of Canal is to provide incremental data subscription and consumption based on MySQL database incremental log parsing. According to the correct pronunciation of Canal phonetic alphabet and “knock urine” close, rather than many people think Can Nal, “the author was developed for this little sister laughed at.” In the early days, because of the dual-machine room deployment in Hangzhou and the United States, Alibaba had the business demand of cross-machine room synchronization, which was mainly realized by obtaining incremental changes based on business triggers. Since 2010, businesses have gradually tried to parse database logs to obtain incremental changes for synchronization, resulting in a large number of incremental database subscription and consumption services.
Businesses based on incremental log subscription and consumption include:
- Database mirroring.
- Backup the database in real time.
- Index building and real-time maintenance (split heterogeneous indexes, inverted indexes, etc.).
- business
Cache
The refresh. - Incremental data processing with business logic.
How Canal works
Principle of MySQL active/standby replication:
MySQL
theMaster
The instance writes data changes to the binary log (binary log
, where the records are called binary log eventsbinary log events
, can be passedshow binlog events
To view)MySQL
theSlave
Instance willmaster
thebinary log events
Copy to its relay log (relay log
)MySQL
theSlave
Instance replayrelay log
To reflect data changes to its own data
Here’s how Canal works:
Canal
simulationMySQL Slave
The interaction protocol, pretending to beMySQL Slave
toMySQL Master
senddump
agreementMySQL Master
receiveddump
Request, start pushbinary log
toSlave
(i.e.Canal
)Canal
parsingbinary log
Object (originallybyte
Flow) and can be sent through the connector to the middleware such as the corresponding message queue
About the version and components of Canal
As of the time of writing this article (2020-03-05), Canal’s latest release was V1.1.5-alpha-1 (2019-10-09) and its latest official release was V1.1.4 (2019-09-02). Among them, v1.1.4 mainly added authentication, monitoring function, and do a series of performance optimization, this version of the integrated connector is Tcp, Kafka and RockerMQ. The RabbitMQ connector has been added in v1.1.5-alpha-1, but the RabbitMQ connector in v1.1.5-alpha-1 does not define the port number for connecting to RabbitMQ. This issue has been fixed in the master branch (see the CanalRabbitMQProducer commit record in the source code). In other words, the only built-in connectors available in V1.1.4 are Tcp, Kafka, and RockerMQ. If you want to try out RabbitMQ connectors, you can do so in one of two ways:
- choose
V1.1.5 - alpha - 1
Version, but cannot be modifiedRabbitMQ
theport
Property, which defaults to5672
. - Based on the
master
Branch buildCanal
.
Currently, the Canal project is active, but due to the stability of the function, the author recommends a stable version for the production environment. Currently, the V1.1.4 version is available. “In this example, the V1.1.4 version is used with the Kafka connector.” Canal consists of three core components:
canal-admin
: Background management module, provides orientedWebUI
theCanal
Management ability.canal-adapter
: adapter to add client data landing adaptation and startup functions, includingREST
, log adapter, data synchronization (table to table synchronization) for relational databases,HBase
Data synchronization,ES
Data synchronization and so on.canal-deployer
: publisher, where the core functions are, includingbinlog
Functions such as parsing, converting, and sending packets to connectors are provided by this module.
Generally, the Canal-Deployer component is required and the other two components are available as needed.
Deploy the required middleware
To build a working set of components, you need to deploy MySQL, Zookeeper, Kafka, and Canal middleware. The following is a brief analysis of the deployment process. The selected virtual machine system is CentOS7.
MySQL installation
For the sake of simplicity, choose yum source installation (the official link is https://dev.mysql.com/downloads/repo/yum) :
::: info mysql80-community-release-el7-3 ::: info mysql80-community-release-el7-3 ::: info mysql80-community-release-el7-3 ::: info mysql80-community-release-el7-3 ::: info mysql80-community-release-el7-3 ::: info mysql80-community-release-el7-3
Use the latest mysql8.x community version, download the RPM package applicable to CentOS7:
cd /data/mysql
wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm
// After the download is completesudo rpm -Uvh mysql80-community-release-el7-3.noarch.rpm
Copy the code
Mysql-related packages in the yum repository:
[root@localhost mysql]# yum repolist all | grep mysql
Mysql cluster 7.5-community/x86_64 mysql cluster 7.5 community disabledMysql-cluster-7.5-community-source mysql cluster 7.5 community-disabledMysql cluster 7.6-community/x86_64 mysql cluster 7.6 community disabledMysql-cluster-7.6-community-source mysql cluster 7.6 community-disabledMysql cluster 8.0 community/x86_64 mysql cluster 8.0 community disabledMysql -cluster-8.0-community-source mysql cluster 8.0 community-disabledmysql-connectors-community/x86_64 MySQL Connectors Community enabled: 141 mysql-connectors-community-source MySQL Connectors Community - disabled mysql-tools-community/x86_64 MySQL Tools Community enabled: 105 mysql-tools-community-source MySQL Tools Community - Sourc disabled mysql-tools-preview/x86_64 MySQL Tools Preview disabled mysql-tools-preview-source MySQL Tools Preview - Source disabled Mysql55 -community/x86_64 MySQL 5.5 Community Server disabledMysql55-community-source MySQL 5.5 community server-disabledMysql56 -community/x86_64 MySQL 5.6 Community Server disabledMysql56-community-source MySQL 5.6 community server-disabledMysql57 -community/x86_64 MySQL 5.7 community Server disabledMysql57-community-source MySQL 5.7 community server-disabledMysql80 -community/x86_64 MySQL 8.0 Community Server Enabled: 161Mysql80-community-source MySQL 8.0 community server-disabledCopy the code
[mysql80-community] : /etc/yum. Repos. d/ mysql-community-repo: /etc/yum. Repos. d/ mysql-community-repo: /etc/yum.
[mysql80-community]
Name = MySQL Community Server 8.0baseurl=http://repo.mysql.com/yum/mysql-8.0-community/el/7/$basearch/
enabled=1
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql Copy the code
MySQL > install MySQL server
sudo yum install mysql-community-server
Copy the code
This is a lengthy process because you need to download and install five RPM packages (or the combined package mysql-8.0.18-1.el7.x86_64.rpm-bundle.tar). If the network is poor, you can manually download it from the official website and install it:
// Download the following 5 RPM packages: common --> libs --> libs-compat --> client --> servermysql-community-common
mysql-community-libs
mysql-community-libs-compat
mysql-community-client
mysql-community-server // Mandatory installationRPM -ivh mysql-community-common-8.0.18-1.el7.x86_64. RPM --force --nodeps RPM -ivh mysql-community-common-8.0.18-1.el7.x86_64RPM -ivh mysql-community-libs-8.0.18-1.el7.x86_64. RPM --force --nodeps RPM -ivh mysql-community-libs-8.0.18-1.el7.x86_64. RPM --force --nodepsRPM -ivh mysql-community-libs-compat-8.0.18-1.el7.x86_64. RPM --force --nodeps RPM -ivh mysql-community-libs-compat-8.0.18-1.el7.x86_64. RPM --force --nodepsRPM -ivh mysql-community-client-8.0.18-1.el7.x86_64. RPM --force --nodeps RPM -ivh mysql-community-client-8.0.18-1.el7.x86_64RPM -ivh mysql-community-server-8.0.18-1.el7.x86_64. RPM --force --nodeps RPM -ivh mysql-community-server-8.0.18-1.el7.x86_64Copy the code
MySQL -u root -p MySQL -u root -p
Mysqld stop mysqld stop mysqld stop mysqld stop mysqld stop mysqld stop mysqld stop mysqld stop mysqld stop mysqldservice mysqld start
// Check the temporary password cat /var/log/mysqld.log[root@localhost log]# cat /var/log/mysqld.log
2020-03-02T06:03:53.996423Z 0 [System] [MY-013169] [Server] /usr/sbin/mysqld (mysqld 8.0.18) initializing of server in progress as process 22780
2020-03-02T06:03:57.321447Z 5 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: >kjYaXENK6li 2020-03-02T06:04:00.123845Z 0 [System] [MY-010116] [Server] /usr/sbin/mysqld (mysqld 8.0.18) starting as process 22834 // Use a temporary password to log in as user root[root@localhost log]# mysql -u root -p Copy the code
Next, do the following:
- Modify the
root
User password:ALTER USER 'root'@'localhost' IDENTIFIED BY 'QWqw12! @ ';
(Note that the password must contain uppercase and lowercase letters, digits, and special characters.) - update
root
thehost
To switch the databaseuse mysql;
, specifyhost
for%
So that it can be accessed remotely by other serversUPDATE USER SET HOST = '%' WHERE USER = 'root';
- give
'root'@'%'
User, all rights, goGRANT ALL PRIVILEGES ON *.* TO 'root'@'%';
- change
root'@'%
User password verification rules so that it can be usedNavicat
Tools such as:ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12! @ ';
Once the operation is complete, you can use the root user to remotely access the MySQL services on this virtual machine. SHOW VARIABLES LIKE ‘%bin%’; :
Finally, run the following command in MySQL Shell to create a new user name canal and password QWqw12! @ new user for REPLICATION SLAVE and REPLICATION CLIENT
CREATE USER canal IDENTIFIED BY 'QWqw12! @ ';GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12! @ ';Copy the code
Switch back to user root and create database test:
CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`;
Copy the code
Install the Zookeeper
Both the Canal and Kafka clusters rely on Zookeeper for service coordination. To facilitate management, the Zookeeper service or Zookeeper cluster are deployed independently. Here I use version 3.6.0 released on 2020-03-04:
midkr /data/zk
#Creating a data directory
midkr /data/zk/data
cd /data/zk
Wget HTTP: / / http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gzTar ZXVF - apache - they are - 3.6.0 - bin. Tar. GzCD apache - they are - 3.6.0 - bin/confcp zoo_sample.cfg zoo.cfg && vim zoo.cfg Copy the code
Set dataDir to /data/zk/data in the zoo. CFG file and start Zookeeper:
[root@localhost conf]# sh /data/zk/apache-zookeeper-3.6.0-bin /bin/zkserver. sh start/usr/bin/java
ZooKeeper JMX enabled by default
Using the config: / data/zk/apache - they are - 3.6.0 - bin/bin /.. /conf/zoo.cfgStarting zookeeper ... STARTED
Copy the code
Note that to start this version of the Zookeeper service, you must have JDK8+ installed locally, which you need to do yourself. The default port is 2181. After the port is successfully started, the log is as follows:
Install the Kafka
Kafka is a high-performance distributed message queue middleware whose deployment relies on Zookeeper. Here, I use the 2.4.0 package with Scala version 2.13:
mkdir /data/kafka
mkdir /data/kafka/data
Wget HTTP: / / http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.13-2.4.0.tgzThe tar - ZXVF kafka_2. 13-2.4.0. TGZCopy the code
Because after decompression/data/kafka/kafka_2. 13-2.4.0 / config/server. The configuration of the properties and the corresponding zookeeper. Connect = localhost: 2181 has been in line with the need, don’t need to modify, Log file directory log.dirs is /data/kafka/data. Then start the Kafka service:
Sh/data/kafka kafka_2. 13-2.4.0 / bin/kafka - server - start. Sh/data/kafka kafka_2. 13-2.4.0 / config/server propertiesCopy the code
This will stop the Kafka process once it exits the console. You can add the -daemon parameter to keep the Kafka process running in the background.
Sh/data/kafka kafka_2. 13-2.4.0 / bin/kafka - server - start. Sh - daemon/data/kafka/kafka_2. 13-2.4.0 / config/server propertiesCopy the code
Install and use Canal
Here we go, using Canal’s v1.1.4 stable release, just download the Deployer module:
mkdir /data/canal
cd /data/canal
#Note that Github is blocked in China, and the download speed is extremely slow. You can use other download tools to download it before uploading it to the server
Wget HTTP: / / https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gzThe tar - ZXVF canal. Deployer - 1.1.4. Tar. GzCopy the code
The decompressed directory is as follows:
-bin # O&M script-conf # configuration fileCanal_local. properties # canal Local configuration, generally do not need to moveProperties # Canal service configurationLogback. XML # logback log configurationMetrics # Metrics statistics configurationSpring # Spring - instance configuration, mainly related to binlog location calculation, some policy configuration, you can choose any of these configuration files in canal.propertiesExample # Instance configuration folder. It is generally assumed that each database corresponds to a separate instance configuration folderInstance. properties # Instance configuration, typically a configuration for a single database- lib # Service dependency packages- logs # Log file output directoryCopy the code
In development and test environments, it is recommended to change the log level of logback. XML to DEBUG to help locate problems. There are two configuration files to look at here: canal.properties and instance.properties. In the canal.properties file, you need to modify:
- To get rid of
canal.instance.parser.parallelThreadSize = 16
This configuration item“annotation“This configuration item is dependent on the number of threads in the instance parser. If not configured, it will block or not resolve. canal.serverMode
The configuration item is specified askafka
, the optional values aretcp
,kafka
androcketmq
(master
Branch or latestV1.1.5 - alpha - 1
Version, you can chooserabbitmq
), the default iskafka
.canal.mq.servers
The configuration must be specified asKafka
Service or clusterBroker
Of, which is configured here as127.0.0.1:9092
.
❝
Canal.mq. Servers have different meanings in different canal.serverMode. In The RocketMQ mode, it is the NameServer list. In the RabbitMQ mode, it is the Host and Port of the RabbitMQ service
❞
For other configuration items, see the links to the two official wikis below:
- Canal-Kafka-RocketMQ-QuickStart
- AdminGuide
Properties generally refers to the configuration of a database instance. The Canal schema supports a Canal service instance and handles binlog asynchronous parsing of multiple database instances. Properties Configuration items that need to be modified include:
canal.instance.mysql.slaveId
You need to configure a andMaster
Node servicesID
Completely different value, here I configured for654321
.- Configure the data source instance, including address, user, password, and target database:
canal.instance.master.address
, which is specified here127.0.0.1:3306
.canal.instance.dbUsername
, which is specified herecanal
.canal.instance.dbPassword
, which is specified hereQWqw12! @
.- new
canal.instance.defaultDatabaseName
, which is specified heretest
(need to be inMySQL
To create atest
Database, see the previous process).
Kafka
Related configuration, static is used here for the time beingtopic
And a singlepartition
:canal.mq.topic
, which is specified heretest
.“That’s what we’re done withbinlog
Structured data is sent toKafka
The namedtest
thetopic
In the“.canal.mq.partition
, which is specified here0
.
Once the configuration is complete, the Canal service can be started:
sh /data/canal/bin/startup.sh
#Viewing Service Logs
tail -100f /data/canal/logs/canal/canal
#Look at the instance log -- In general, focus on the instance log
tail -100f /data/canal/logs/example/example.log
Copy the code
After the startup is normal, the instance log is as follows:
Create an order table in the Test database and perform some simple DML:
use `test`;
CREATE TABLE `order`
(
id BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT 'primary key'. order_id VARCHAR(64) NOT NULL COMMENT 'order ID'. amount DECIMAL(10.2) NOT NULL DEFAULT 0 COMMENT 'Order amount'. create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'Creation time'. UNIQUE uniq_order_id (`order_id`) ) COMMENT 'Order Form'; INSERT INTO `order`(order_id, amount) VALUES ('10086'.999); UPDATE `order` SET amount = 10087 WHERE order_id = '10086'; DELETE FROM `order` WHERE order_id = '10086'; Copy the code
In this case, we can use Kafka’s Kafka-console-Consumer or Kafka Tools to view the data for the test topic:
Sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer. Sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic testCopy the code
The specific data are as follows:
// test Database creation script{"data":null,"database":"`test`","es":1583143732000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql ":"CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`","sqlType":null,"table":"","ts":1583143930177,"type":"QUERY"}
// Create table DDL for order table{"data":null,"database":"test","es":1583143957000,"id":2,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":" CREATE TABLE 'order' \n(\n ID BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT 'PRIMARY KEY ',\n order_id VARCHAR(64) NOT NULL COMMENT 'order ID',\n amount DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT 'CURRENT_TIMESTAMP ',\n create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT' create time ',\n UNIQUE Uniq_order_id (' order_id ')\n) COMMENT ","sqlType":null,"table":"order","ts":1583143958045,"type":"CREATE"} // INSERT Binlog event{" data ": [{" id" : "1", "order_id" : "10086", "amount" : "999.0", "create_time" : "2020-03-02 05:12:49"}],"database":"test","es":1583143969000,"id":3,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64) ", "amount" : "a DECIMAL (10, 2)", "create_time" : "DATETIME"}, "old", null, "pkNames" : (" id "), "SQL" : ""," sqlType ": {" id" : - 5, "order_id" : 12,"amount":3,"create_time":93},"table":"order","ts":1583143969460,"type":"INSERT"} // UPDATE Binlog event{" data ": [{" id" : "1", "order_id" : "10086", "amount" : "10087.0", "create_time" : "2020-03-02 05:12:49"}],"database":"test","es":1583143974000,"id":4,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64) ", "amount" : "a DECIMAL (10, 2)", "create_time" : "DATETIME"}, "old" : [{" amount ":" 999.0 "}], "pkNames" : (" id "), "SQL" : ""," sqlType ": {" id ":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143974870,"type":"UPDATE"} // DELETE the Binlog event{" data ": [{" id" : "1", "order_id" : "10086", "amount" : "10087.0", "create_time" : "2020-03-02 05:12:49"}],"database":"test","es":1583143980000,"id":5,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64) ", "amount" : "a DECIMAL (10, 2)", "create_time" : "DATETIME"}, "old", null, "pkNames" : (" id "), "SQL" : ""," sqlType ": {" id" : - 5, "order_id" : 12,"amount":3,"create_time":93},"table":"order","ts":1583143981091,"type":"DELETE"}Copy the code
We can see that Kafka’s topic named test has written the corresponding structured binlog event data. We can write the consumer to listen to Kafka’s corresponding topic and then process the obtained data. Here sent data structure can consult Canal source (the current editor of time for the 2020-07-03 master branch) of com. Alibaba. Otter. Canal. Protocol. FlatMessage:
One note:
FlatMessage.data
Is the currentDML
Newly written data, andFlatMessage.old
Is the historical data before the new data is writtenUPDATE
The type ofDML
Speaking,FlatMessage.data
andFlatMessage.old
There is data.FlatMessage.sqlType
theMap.Entry#value()
Student: In generaljava.sql.JDBCType
This enumeration has a consistent mapping and can be resolved to match each oneColumn
Properties of theJDBCType
, and then according to the need to convert into the appropriateJava
Type is ok.- To improve transmission efficiency,
Canal
When sent to the message middleware, the message is merged, oneFlatMessage
It is possible to contain multiple different records of the same type of event, noteFlatMessage.data
isList<Map<String, String>>
Type, such as for the same tableINSERT
Events that may be merged into the sameFlatMessage
Instance,FlatMessage.data
Contains two elements. Canal
Sent to theFlatMessage
“, usingFastJson
Serialization, we’ve seen a lot lately aboutFastJson
You need to be mentally prepared for a version upgrade.
summary
Most used in this article to introduce how other middleware deployment, this problem illustrates the Canal side deployment itself is not complicated, its configuration file attributes item is more, but actually need to customize and change the configuration of the item is less, is also illustrates its operational cost and learning cost is not high.
At present, the author is responsible for architecture, part of operation and maintenance, and the construction of data center. Some time ago, I led the migration of the whole set of online services from UCloud to Aliyun, and applied cloud RDS MySQL. Meanwhile, I built a set of Canal HA cluster for subscriping to the data of core services. Falling into a continuously modeled data warehouse, some real-time cache calculations and updates are performed based on near real-time Binlog events, and some visual charts are generated to provide multi-dimensional ICONS for real-time monitoring of operational metrics with Metabase. In the process, a relatively large number of potholes were stepped. Parsing the Binlog events generated by Canal took a lot of time in the early stage. Considering the low development efficiency, the author spent some time writing a glue layer for parsing Binlog events, realizing the function of object mapping without perception, and releasing productivity. The next article will look at the implementation of the Binlog event parsing tool, and then share the problems encountered and solutions, including the Canal stop-start exception (which is quite large if you use cloud RDS MySQL) and the Binlog event sequence.
References:
- A Quick Guide to Using the MySQL Yum Repository
- Canal
(C-3-D E-A-20200306 R-A-20200709)
Throwable Digest (ID: Throwable Doge), the public account of Throwable Digest (ID: Throwable Doge), pushes the author’s original technical articles from time to time (never plagiarize or reprint) :
This article was typeset using MDNICE