Canal is an open source project of Alibaba. The main purpose of Canal is to provide incremental data subscription and consumption based on binlog log parsing of MySQL database.
Log-based incremental subscription and consumption services include:
- Database mirroring
- Real-time Database backup
- Index building and real-time maintenance (split heterogeneous index, inverted index, etc.)
- Service Cache Refresh
- Incremental data processing with business logic
My side is mainly used in two scenarios:
One is to synchronize change data to Elasticsearch and Redis in real time.
Here is my current practice. On the one hand, it is full data timing synchronization. Due to the large amount of data and long synchronization time, the data is not real-time enough. The second aspect is the change of single data. Some of Elasticsearch and Redis logic are written directly in the business code, so the coupling is very serious.
When stripped out, real-time incremental updates can be implemented and decoupled, and the benefits are great.
The second is to preserve historical changes that focus on the data.
This is currently used in the “Asset Management” module, which implements IP lifecycle management by recording the creation, change and deletion of IP assets, facilitating historical information traceability.
MySQL configuration
MySQL my. CNF: enable binlog writing and set the mode to ROW.
log-bin=mysql-Bin # Enable binlog Binlog-format=ROW# optionROWModel server_id=1MySQL replaction (); MySQL replaction ()Copy the code
Restart the database to check whether the configuration takes effect.
mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW |
+---------------+-------+
1 row in set (0.19 sec)
mysql>
mysql> show variables like 'log_bin';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| log_bin | ON |
+---------------+-------+
1 row in set (0.00 sec)
mysql>
mysql> show master status;
+------------------+----------+--------------+------------------+-------------------+
| File | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+------------------+----------+--------------+------------------+-------------------+
| mysql-bin000003. | 4230 | | | |
+------------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)
Copy the code
Then create the user and authorize it.
mysql> CREATE USER canal IDENTIFIED BY 'canal';
mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'% %';
mysql> FLUSH PRIVILEGES;
mysql> show grants for 'canal'@'% %';
+----------------------------------------------------------------------------+
| Grants for canal@%% |
+----------------------------------------------------------------------------+
| GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO `canal`@`%%` |
+----------------------------------------------------------------------------+
1 row in set (0.00 sec)
Copy the code
Canal server
Pull mirror:
#Docker pull canal/canal - server: v1.1.4
Copy the code
Then use the official shell script to launch directly:
# sh run.sh -e canal.auto.scan=false -e canal.destinations=test- e canal. The instance. The master. The address = 127.0.0.1:3306 - e canal. The instance. The dbUsername = canal. The canal - e instance. DbPassword = canal - e canal.instance.connectionCharset=UTF-8 -e canal.instance.tsdb.enable=true -e canal.instance.gtidon=false
Copy the code
Docker-compose: docker-compose: docker-compose: docker-compose
version: '3'
services:
canal-server:
image: Canal/canal - server: v1.1.4
container_name: canal-server
restart: unless-stopped
network_mode: host
ports:
- 11111: 11111
environment:
- canal.auto.scan=false
- Canal. The instance. The master. The address = 127.0.0.1:3306
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
- canal.instance.filter.regex=.*\\.. *
- canal.destinations=test
- canal.instance.connectionCharset=UTF-8
- canal.instance.tsdb.enable=true
volumes:
- /root/canal/test/log/:/home/admin/canal-server/logs/
Copy the code
Start the service:
# docker-compose upRecreating canal-server ... done Attaching to canal-server canal-server | DOCKER_DEPLOY_TYPE=VM canal-server | ==> INIT /alidata/init/02init-sshd.sh canal-server | ==> EXIT CODE: 0 canal-server | ==> INIT /alidata/init/fix-hosts.py canal-server | ==> EXIT CODE: 0 canal-server | ==> INIT DEFAULT canal-server | Generating SSH1 RSA host key: [ OK ] canal-server | Starting sshd: [ OK ] canal-server | Starting crond: [ OK ] canal-server | ==> INIT DONE canal-server | ==> RUN /home/admin/app.sh canal-server | ==> START ... canal-server | start canal ... canal-server | start canal successful canal-server | ==> START SUCCESSFUL ...Copy the code
The Canal Python client
Copy client code directly provided by the official:
import time
from canal.client import Client
from canal.protocol import EntryProtocol_pb2
from canal.protocol import CanalProtocol_pb2
client = Client()
client.connect(host='127.0.0.1', port=11111)
client.check_valid(username=b'', password=b'')
client.subscribe(client_id=b'1001', destination=b'test'.filter=b'.*\\.. * ')
while True:
message = client.get(100)
entries = message['entries']
for entry in entries:
entry_type = entry.entryType
if entry_type in [EntryProtocol_pb2.EntryType.TRANSACTIONBEGIN, EntryProtocol_pb2.EntryType.TRANSACTIONEND]:
continue
row_change = EntryProtocol_pb2.RowChange()
row_change.MergeFromString(entry.storeValue)
event_type = row_change.eventType
header = entry.header
database = header.schemaName
table = header.tableName
event_type = header.eventType
for row in row_change.rowDatas:
format_data = dict(a)if event_type == EntryProtocol_pb2.EventType.DELETE:
for column in row.beforeColumns:
format_data = {
column.name: column.value
}
elif event_type == EntryProtocol_pb2.EventType.INSERT:
for column in row.afterColumns:
format_data = {
column.name: column.value
}
else:
format_data['before'] = format_data['after'] = dict(a)for column in row.beforeColumns:
format_data['before'][column.name] = column.value
for column in row.afterColumns:
format_data['after'][column.name] = column.value
data = dict(
db=database,
table=table,
event_type=event_type,
data=format_data,
)
print(data)
time.sleep(1)
client.disconnect()
Copy the code
Functional verification
MySQL > alter table create table create table create table create table create table create table
mysql> create database test;
mysql> use test;
mysql> CREATE TABLE `role` ( `id` int unsigned NOT NULL AUTO_INCREMENT, `role_name` varchar(255)
DEFAULT NULL.PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;
mysql> insert into role (id, role_name) values (10.'admin');
Query OK, 1 row affected (0.01 sec)
mysql> update role set role_name='hh' where id = 10;
Query OK, 1 row affected (0.01 sec)
Rows matched: 1 Changed: 1 Warnings: 0
mysql> delete from role where id = 10;
Query OK, 1 row affected (0.01 sec)
Copy the code
Client printout:
$ python canal_client.py
connected to 127.0. 01.:11111
Auth succed
Subscribe succed
header {
version: 1
logfileName: "mysql-bin.000003"
logfileOffset: 5497
serverId: 1
serverenCode: "UTF-8"
executeTime: 1607843285000
sourceType: MYSQL
eventLength: 75
}
entryType: TRANSACTIONBEGIN
storeValue: " \217\001"
header {
version: 1
logfileName: "mysql-bin.000003"
logfileOffset: 5630
serverId: 1
serverenCode: "UTF-8"
executeTime: 1607843285000
sourceType: MYSQL
schemaName: "test"
tableName: "role"
eventLength: 47
eventType: INSERT
props {
key: "rowsCount"
value: "1"
}
}
entryType: ROWDATA
storeValue: "\010\322\001\020\001P\000bN\022 \010\000\020\004\032\002id \001(\0010\000B\00210R\014int unsigned\022*\010\001\020\014\032\trole_name \000(\0010\000B\005adminR\014varchar(255)"
{'db': 'test'.'table': 'role'.'event_type': 1.'data': {'role_name': 'admin'}}
header {
version: 1
logfileName: "mysql-bin.000003"
logfileOffset: 5677
serverId: 1
serverenCode: "UTF-8"
executeTime: 1607843285000
sourceType: MYSQL
eventLength: 31
}
entryType: TRANSACTIONEND
storeValue: "\ 022\003440"
Copy the code
To change a piece of data, the output consists of TRANSACTIONBEGIN, ROWDATA, and TRANSACTIONEND. Then the content we are interested in is in ROWDATA, which is what we need after parsing, including database name, table name and change content.
The event_type field 1 indicates new, 2 indicates update, and 3 indicates deletion.
Update corresponding output:
{'db': 'test'.'table': 'role'.'event_type': 2.'data': {'before': {'id': '10'.'role_name': 'hh'}, 'after': {'id': '10'.'role_name': 'hh'}}}
Copy the code
Delete corresponding output:
{'db': 'test'.'table': 'role'.'event_type': 3.'data': {'role_name': 'hh'}}
Copy the code
After the canal server is started, two log files, meta. Log and test.log, are generated in the /home/admin/canal-server/logs/test directory to check whether the service is running properly and whether errors are reported. Where test is the name of the Canal.destinations setting when the Docker is started.
# cat meta.log
2020-12-13 14:55:18.051 - clientId:1001 cursor:[mysql-bin000003..4805.1607842360000.1,] address[/127.0. 01.:3306]
2020-12-13 14:55:33.051 - clientId:1001 cursor:[mysql-bin000003..5096.1607842531000.1,] address[127.0. 01.:3306]
2020-12-13 14:57:07.051 - clientId:1001 cursor:[mysql-bin000003..5387.1607842625000.1,] address[127.0. 01.:3306]
# cat test.log
2020-12-13 14:55:09.067 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2020-12-13 14:55:09.144 [destination = test , address = /127.0. 01.:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2020-12-13 14:55:09.144 [destination = test , address = /127.0. 01.:3306. EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status2020-12-13 14:55:09.693 [destination = test , address = /127.0. 01.:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin000003.,position=4699,serverId=1,gtid=,timestamp=1607842360000] cost : 538ms , the next step is binlog dump
Copy the code
Record on pit
Everything was fine in the test environment I built myself, but I still encountered a problem when I put it into the project beta environment:
[fetch failed by table meta:
schemeName
.tableName
]
SQL > alter table alter table alter table alter table alter table alter table alter table alter table
canal.instance.filter.table.error=true
Copy the code
After adding, the error messages did disappear, but the consumed data did not have ROWDATA, which really bothered me for a long time.
To tell the truth, sometimes debugging procedures, and not afraid of error, fear is no error, and then the program is not normal.
Later, I removed the ignore table error configuration and looked at the log again. There was another error:
Caused by: java.io.IOException: ErrorPacket [errorNumber=1142, fieldCount=-1, message=SHOW command denied to user
The default binlog account does not have the select permission. The default binlog account does not have the select permission.
It’s important to take a moment and look at your journal.
Above, the next article will talk about docking MQ.
Reference Documents:
Github.com/alibaba/can…
Github.com/haozi315666…