One, foreword
canal
Ali is an open source project, PureJava
Development. Based on database incremental log resolution, provide incremental data subscription & consumption, currently mainly supportedMySQL
(also supportmariaDB
).
- Canal simulates the interaction protocol of mysql slave, disguises itself as mysql slave, and sends the dump protocol to mysql master.
- Mysql master receives dump request and starts pushing binary log to slave(canal)
- Canal parses the binary log object (originally a byte stream).
Overall structure:
Ii. Preparations for deployment
Download: github.com/alibaba/can…
Download respectively: canal.admin, canal.deployer, canal.adapter
PS: Only versions 1.1.5 and later support ES7.x
Other dependencies:
- JDK1.8
- MySQL: used for canal-admin to store configuration and node data
- Zookeeper
3. HA mechanism
The control of the HA mechanism relies on two features of ZooKeeper: the Watcher and EPHEMERAL nodes. The IMPLEMENTATION of CANAL HA mechanism is divided into two parts. Canal Server and Canal Client have corresponding implementations respectively.
The canal Server implementation process is as follows:
- When canal Server wants to start a canal instance, it first makes an EPHEMERAL attempt to ZooKeeper.
- After the ZooKeeper node is successfully created, the corresponding Canal Server starts the corresponding Canal instance. The canal instance that is not successfully created is in standby state.
- Once ZooKeeper finds that the node created by Canal Server A disappears, it immediately notifies other Canal servers to perform Step 1 again and selects another Canal Server to start instance.
- Each time the Canal Client connects, it first asks ZooKeeper who started canal Instance and then establishes a link with it. If the link is unavailable, it tries to connect again.
PS: To reduce requests for mysql dump, only one instance on different servers must be running at a time, and the others must be in standby state.
Canal Client implementation process
- The Canal Client approach is similar to the Canal Server approach in that it is controlled by preempting the EPHEMERAL node of ZooKeeper
- To ensure orderliness, only one Canal client can perform the GET, ACK, and rollback operations on one instance at a time. Otherwise, the order cannot be guaranteed.
4. Cluster deployment
4.1. The MySQL prepared
4.4.1. Open the binlog
MySQL my.cnf MySQL my.cnf
[mysqld]
log-bin=Mysql -bin # enable binlog
binlog-format=ROW # Select ROW mode
server_id=1 # configure MySQL replaction (slaveId
Copy the code
Note: if you subscribe to the mysql slave library, you need to configure the slave library log to be written to the binlog
log_slave_updates=1
Copy the code
You can run the following command on the mysql terminal to check whether the configuration takes effect:
show variables like 'log_bin';
show variables like 'binlog_format';
Copy the code
4.1.2. Authorize account permissions
Grant MySQL slave to MySQL slave; grant MySQL slave to MySQL slave
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@The '%';
FLUSH PRIVILEGES;
Copy the code
4.2. The deployment of canal – admin
2. The role
- Manage configuration parameters on a GRAPHICAL user interface (GUI).
- Dynamic start-stop
Server
和Instance
- Viewing Log Information
4.2.2. Execute database scripts
Follow the canal_manager.sql steps downloaded from the conf directory to initialize the required library tables.
Initialize the Canal_manager database by default in the SQL script. You are advised to initialize the canal_manager database using a super user account such as root
4.2.3. Configuration Modification
Perform vim conf/application. Yml
server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: address: 127.0.0.1:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.driver URL: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}? useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin adminPasswd: adminCopy the code
Modify the address, database, username, and password parameters
4.2.4. Start and Stop commands
Start the
sh bin/startup.sh
Copy the code
stop
sh bin/stop.sh
Copy the code
4.2.5. Use
You can log in to the ism through http://127.0.0.1:8089/. The default password is admin and 123456
4.2.5.1. Creating a Cluster
configurationThe name of the cluster 与 ZK address
configurationThe master configurationThis configuration is shared by all Server instances in the clusterModify the following configurations:
- Ccanal. ZkServers Set the ZooKeeper cluster address
- Canal. Instance. Global. Spring. The XML to the classpath: spring/default – the instance. The XML
4.2.5.2. Create a Server
Configuration items:
- Owning cluster the value can be a single-node cluster or a cluster. The single-server mode is mainly used for one-off tasks or test tasks
- Server name, unique, easy to remember
- Server Ip address, machine Ip address
- Admin port, a new capability in Canal 1.1.4, will provide remote management operations on the canal-server, default 11110
- TCP port. Canal Provides the Netty data subscription service
- Metric port, promethues’ exporter monitoring data port (monitoring interconnection will be implemented in the future)
The active/standby HA architecture can be formed when multiple servers are associated with a cluster
4.2.5.3. Creating the Instance
eachInstance
Associate a synchronized data source, or create multiple data sources if there are more to synchronizeThe instance
- First fill in the instance name
- Select the cluster you just created
- Load template configuration
Modify the following configurations:
- Canal. The instance. The master. The address configuration to synchronize the database address
- Canal. Instance. DbUsername database user name (synchronous permissions required)
- Canal. Instance. DbPassword database password
- Canal. The instance. The filter. The regex mysql data analytical focus on table, Perl regular expressions. Multiple regees are separated by commas (,), and escapes require double slashes (\).
Canal. The instance. The filter. The regex common example:
- All tables:.* or.\..
- All tables under canal schema: canal\.. *
- Canal \.canal.*
- A table under Canal Schema: canal.test1
- Multiple rules used in combination: canal\.. *,mysql.test1,mysql.test2 (comma separated)
Note: This filter only applies to data in row schema. Ps. Mixed /statement does not parse SQL, so it cannot extract tableName accurately.
4.3. The deployment of canal – deployer
This role
- Disguised as a
MySQL
To synchronize the binlog of the master library. - Parsing and structuring
binary log
Object.
4.3.2. Modify the configuration
Run the vim conf/canal_local.properties command to change the canal.admin.manager configuration item to the canal-admin address
4.3.3. Start and Stop commands
Start using the local configuration
bin/startup.sh local
Copy the code
stop
bin/stop.sh
Copy the code
4.4. The deployment of canal – adapter
4.4.1. Role
- Connects to upstream messages, including Kafka, RocketMQ, and Canal-Server
- Implement incremental synchronization of mysql data
- Implement full synchronization of mysql data
- Downstream write supports mysql, ES, and hbase
4.4.2. Modify the configuration
Note: The Adapter supports dynamic configuration, which means that the task automatically refreshes the configuration after modifying the configuration file without restarting it!
(1) to modify application. Yml
Run vim conf/application.yml to modify the configuration of consumerProperties, srcDataSources, and canalAdapters
canal.conf:
mode: tcp TCP kafka rocketMQ
flatMessage: true Flat message switch, whether to send data as A JSON string, only works in Kafka /rocketMQ mode
syncBatchSize: 1000 # Number of batches per synchronization
retries: 0 # number of retries. -1 indicates unlimited retries
timeout: # Synchronization timeout in milliseconds
consumerProperties:
canal.tcp.server.host: # corresponds to Canal in single-machine mode
canal.tcp.zookeeper.hosts: 127.0. 01.: 2181 Canal.tcp.server. host = canal.tcp.server.host = canal.tcp.server
canal.tcp.batch.size: 500 # number of TCP pull messages per pull
srcDataSources: # source database
defaultDS: # custom name
url: JDBC: mysql: / / 127.0.0.1:3306 / mytest? useUnicode=true # jdbc url
username: root # JDBC account
password: 121212 # JDBC password
canalAdapters: Adapter list
- instance: example # canal instance name or MQ topic name
groups: # Group list
- groupId: g1 Group ID, which will be used in MQ mode
outerAdapters: # List of adapters within a group
- name: es7 # ES7 adapter
mode: rest # transport or rest
hosts: 127.0. 01.: 9200 # es address
security.auth: test:123456 Access es authentication information, if not, do not need to fill in
cluster.name: my-es Transport mode must be configured
.
Copy the code
- A piece of data can be consumed by multiple groups at the same time. Multiple groups execute in parallel. In one group, a serial execute in multiple outerAdapters, for example, logger and hbase
- Currently, there are two methods for client Adapter data subscription: direct connection to Canal Server or subscription to Kafka /RocketMQ messages
(2) Add a mapping configuration file in the conf/es7 directory
The Adapter will automatically load all configuration files ending in.yml in conf/ ES7
Add configuration files for table mapping, such as sys_user.yml, as follows:
dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
_index: sys_user
_id: id
upsert: true
sql: "Select id, username,, case when sex = 0 THEN 'male' else 'female' end sex, Case when is_del = 0 then 'no' else 'end isdel from sys_user'
etlCondition: "where update_time>={}"
commitBatch: 3000
Copy the code
- dataSourceKeyconfiguration
application.yml
里srcDataSources
The value of the - destinationconfiguration
canal.deployer
的Instance
名 - groupIdconfiguration
application.yml
里canalAdapters.groups
The value of the - _index Indicates the index name
- _ID Indicates the field corresponding to the primary key
- Upsert whether upsert is updated
- SQL mapping SQL
- EtlCondition Condition parameter of the ETL. It can be used during full synchronization
- CommitBatch Indicates the batch size
SQL mappings support free combinations of multiple table associations, but with certain limitations:
- The primary table cannot be a subquery
- You can only use left outer JOIN where the leftmost table must be the primary table
- The associated slave table cannot have multiple tables if it is a subquery
- The primary SQL cannot have a WHERE condition. (The primary SQL cannot have a WHERE condition from the table subquery, but it is not recommended, may cause inconsistent data synchronization, for example, modify the content of the WHERE condition.)
- Only the ‘=’ operation of the primary foreign key cannot be used for constant judgments, for example, on A. ral_id = B. id and B. tatues=1
- An association condition must have a field in the primary query such as on A.ole_id = B.ID where a.ole_id or B.ID must appear in the primary SELECT statement
The mapping attribute of Elastic Search will correspond to the SQL query value one by one (select * is not supported). Select a.id as _id, a.name, a.email as _email from user. _email is mapped to the mapping’s _email field, where an alias (if any) is used as the final mapping field. The _id can be added to the _id: _id mapping in the configuration file
4.4.3. Start and Stop commands
Start the
bin/startup.sh
Copy the code
Shut down
bin/stop.sh
Copy the code
4.5. Legacy issues
The 1.1.5-Snapshot version currently in use is not released yet. There is a bug in the canal-Adapter cluster deployment, and the following exceptions will occur after the ZooKeeper address is configured:
java.lang.LinkageError: loader constraint violation: when resolving method "com.alibaba.otter.canal.common.zookeeper.ZkClientx.create(Ljava/lang/String; Ljava/lang/Object; Lorg/apache/zookeeper/CreateMode;) Ljava/lang/String;" the class loader (instance of com/alibaba/otter/canal/connector/core/spi/URLClassExtensionLoader) of the current class, com/alibaba/otter/canal/client/impl/running/ClientRunningMonitor, and the class loader (instance of sun/misc/Launcher$AppClassLoader) for the method's defining class, org/I0Itec/zkclient/ZkClient, have different Class objects for the type org/apache/zookeeper/CreateMode used in the signature at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.initRunning(ClientRunningMonitor.java:122) The [TCP 1.1.5 - the SNAPSHOT - jar - with - dependencies. Jar: na] the at com.alibaba.otter.canal.client.impl.running.ClientRunningMonitor.start(ClientRunningMonitor.java:93) The [TCP 1.1.5 - the SNAPSHOT - jar - with - dependencies. Jar: na] the at com.alibaba.otter.canal.client.impl.SimpleCanalConnector.connect(SimpleCanalConnector.java:108) The [TCP 1.1.5 - the SNAPSHOT - jar - with - dependencies. Jar: na] the at com.alibaba.otter.canal.client.impl.ClusterCanalConnector.connect(ClusterCanalConnector.java:64) The [TCP 1.1.5 - the SNAPSHOT - jar - with - dependencies. Jar: na] the at com.alibaba.otter.canal.connector.tcp.consumer.CanalTCPConsumer.connect(CanalTCPConsumer.java:59) The [TCP 1.1.5 - the SNAPSHOT - jar - with - dependencies. Jar: na]Copy the code
There are three solutions:
- The Adapter is using single-instance mode for the time being, pending official resolution of the problem.
- Self-fix bugs
- use
MQ
Mode (Adapter does not need to register with ZooKeeper)
This BUG has been fixed: github.com/zlt2000/can…
Five, monitoring,
By default, Canal has exposed metrics information related to synchronization through port 11112. Real-time synchronization can be monitored by integrating Prometheus and Grafana, as shown below:
indicators | The paper |
---|---|
Basic | Canal Instance Basic information. |
Network bandwith | Network bandwidth. This includes inbound(the network bandwidth that the Canal Server reads from the binlog) and outbound(the network bandwidth that the Canal Server returns to the Canal Client). |
Delay | Delay between Canal Server and Master; Delay of store PUT, GET, and ACK operations. |
Blocking | Blocking ratio of sink threads; Blocking ratio of dump threads (parallel mode only). |
TPS(events) | Canal Instance consumes TPS for all binlog events, measured in MySQL binlog events. |
TPS(transaction) | Canal Instance handles the TPS of the binlog, measured in MySQL transactions. |
TPS(tableRows) | The PUT, GET, and ACK operations corresponding to stores are TPS for table change rows. |
Client requests | The number of requests sent by the Canal client to the server is classified by request type, such as GET, ACK, sub, and rollback. |
Client QPS | QPS sent by clients are classified by GET and CLIENTACK. |
Empty packets | The Canal Client asked the server to return a count of null results. |
Response time | Statistics on the response time of the Canal Client to the server. |
Store remain events | The number of events accumulated in the Canal Instance ringbuffer. |
Store remain mem | Memory usage of events accumulated in the Canal Instance ringbuffer. |
Six, summarized
- Prepare the MySQL
- Enable binlog (Row mode)
- The user for which synchronization permission is to be obtained
- Create the canal-admin library table
- Prepare zookeeper
- The deployment of canal – admin
- Create the cluster
- Create a server: Associate a cluster
- Create Instance: Associate the cluster and configure the source library information
- Start the canal – deployer
- Associated canal – admin
- Start the canal – adapter
- They are associated
- Configure source library information
- Associated with the Instance
- Configuring target Library Information (ES)
- Added a mapping configuration file
Scan code attention has surprise!