This is the fourth day of my participation in the November Gwen Challenge. Check out the details: The last Gwen Challenge 2021.
In my work, I need to synchronize the full amount of FM database data to my own system, and the other party’s data is updated in real time. The other party only provides the synchronization interface, and all the synchronization tasks need to be carried out by the project team. For the incremental data, I haven’t found a proper method, and my eyes were bright when I saw Canal. So what is Canal?
A brief introduction to Canal
Keywords: incremental log, incremental data subscription and consumption
Canal [kə næl] is used for incremental log parsing based on MySQL database, providing incremental data subscription and consumption.
In the early stage, Because of the deployment of double machine rooms in Hangzhou and the United States, Alibaba had the business requirement of cross-machine room synchronization, and the realization method was mainly based on business trigger to obtain incremental changes. Since 2010, businesses have gradually tried database log parsing to obtain incremental changes for synchronization, resulting in a large number of database incremental subscription and consumption services.
Let’s start with a picture:
Mysql master-slave replication
-
MySQL master writes data changes to binary log (binary log events)
-
MySQL slave copy master binary log events to its relay log
-
MySQL slave replays events in the relay log to reflect data changes to its own data
How Canal works
Canal disguised itself as a MySQL slave, mimicking the interaction protocol of the MySQL slave to send the dump protocol to MySQL Mater, and MySQL Mater received a dump request from Canal. Canal then parses the binary log and sends it to storage destinations such as MySQL, Kafka, Elastic Search, etc.
What can Canal do
Logging based incremental subscription and consumption services include
- Database mirroring
- Real-time Database backup
- Index building and real-time maintenance (split heterogeneous index, inverted index, etc.)
- Service Cache Refresh
- Incremental data processing with business logic
Current canal supports source MySQL versions including 5.1.x, 5.5.x, 5.6.x, 5.7.x, 8.0.x
How is Canal used
1, mysql configuration
Enable the binary log of MySQL
I am using mysql 8.x and need to add the following configuration in my.ini:
[mysqld] # select log-bin=mysql-bin # select ROW from log-format=ROW Do not duplicate slaveId for Canal server_id=1Copy the code
The mysql version configuration file for some servers should be my.cnf
Create a Canal user and authorize it
Canal@123456 create user 'canal'@'%' identified by 'canal'; Canal@123456 create user 'canal'@'%' identified by 'canal'; Grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on *.* to 'canal'@'%' identified by 'canal';Copy the code
Grant SELECT, REPLICATION SLAVE, REPLICATION CLIENT on. To ‘canal’@’%’ with grant option;
flush privileges;
Net restart mysql
show variables like ‘log_bin’;
2. Canal installation
Download address: official website download page: github.com/alibaba/can…
Extract the download is complete to directory, the conf/example/instance properties to modify the configuration file
## mysql serverId, v1.0.26+ will autoGen ## So don't have to configure # canal. Instance. Mysql. SlaveId = 0 # database address canal. The instance. The master. The address = 127.0.0.1:3306 # binlog log name Canal. The instance. The master. The journal. The name = mysql - bin. 000001 # mysql binlog offset the start of the main library link canal. The instance. The master. The position = 154 # The username/password # in MySQL server authorization account password canal. The instance. The dbUsername = canal canal. Instance. DbPassword = canal @ # 123456 character set canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false # table regex .*\.. * said to monitor all the tables Also can use, write the specific name of the table, separated by canal. The instance. The filter. The regex =. * \.. *Copy the code
Then run the start.bat script.
3. Introduction of Java
Pom adds dependencies:
< the dependency > < groupId > com. Alibaba. Otter < / groupId > < artifactId > canal. The client < / artifactId > < version > 1.1.4 < / version > </dependency>Copy the code
Create a SpringBoot project and directly create the CanalClient class to create the connection.
public static void main(String[] args) { //1. Create a connection CanalConnector connect = CanalConnectors. NewSingleConnector (new InetSocketAddress (" 127.0.0.1 ", 11111), "example." ""," "); Int bachChSize = 1000; // Set Boolean running = true; While (running) {//2. Establish connection connect.connect(); Connect.rollback (); // rollback the information placed in the last request to prevent data loss. // Subscribe to match log connect.subscribe(); while (running) { Message message = connect.getWithoutAck(bachChSize); Long batchId = message.getid (); Int size = message.getentries ().size(); if (batchId == -1 || size == 0) {} else { printSummary(message); } // Verify that the specified batchId has been successfully consumed connect.ack(batchId); }}} private static void printSummary(Message Message) {// Iterate over each binlog entity in the batch for (canalentry-entry: Message. However, ()) {/ / transaction starts the if (entry. GetEntryType () = = CanalEntry. EntryType. TRANSACTIONBEGIN | | entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND ) { continue; } // Get binlog file name String logfileName = entry.getheader ().getLogfilename (); Long logfileOffset = entry.getheader ().getLogFileoffset (); Long executeTime = entry.getheader ().getexecuteTime (); String schemaName = entry.getheader ().getschemaname (); String tableName = entry.getheader ().gettablename (); Insert /update/delete String eventTypeName = entry.getheader ().geteventType ().toString().tolowerCase (); System.out.println("logfileName" + ":" + logfileName); System.out.println("logfileOffset" + ":" + logfileOffset); System.out.println("executeTime" + ":" + executeTime); System.out.println("schemaName" + ":" + schemaName); System.out.println("tableName" + ":" + tableName); System.out.println("eventTypeName" + ":" + eventTypeName); CanalEntry.RowChange rowChange = null; Try {/ / for storing data, and the binary data parsing RowChange = CanalEntry RowChange entities. RowChange. ParseFrom (entry. GetStoreValue ()); } catch (InvalidProtocolBufferException e) { e.printStackTrace(); } // Iterate over each change data for (CanalEntry.RowData RowData: RowChange. GetRowDatasList ()) {/ / determine whether to DELETE the event if (entry. The getHeader (). The getEventType () = = CanalEntry. EventType. DELETE) { System.out.println("---delete---"); printColumnList(rowData.getBeforeColumnsList()); System.out.println("---"); } / / determine whether to UPDATE the event else if (entry. The getHeader (). The getEventType () = = CanalEntry. EventType. UPDATE) { System.out.println("---update---"); printColumnList(rowData.getBeforeColumnsList()); System.out.println("---"); printColumnList(rowData.getAfterColumnsList()); } / / determine whether to INSERT the event else if (entry. The getHeader (). The getEventType () = = CanalEntry. EventType. INSERT) { System.out.println("---insert---"); printColumnList(rowData.getAfterColumnsList()); System.out.println("---"); }}}} // Print all Column names and values private static void printColumnList(List< Canalentry. Column> columnList) {for (canalentry. Column column : columnList) { System.out.println(column.getName() + "\t" + column.getValue()); }}Copy the code
Run directly
The above completed the Java client code, here do not do specific processing, just print, first have an intuitive feeling.
Finally, we started testing, starting MySQL, Canal Server, and the Spring Boot project. Then create the table, insert the data, will have the corresponding binlog output.
Canal’s advantage is that it does not intrude on the business code, because it is based on listening to binlog logs to synchronize data. Real-time can also achieve quasi real-time, in fact, many enterprises a more common data synchronization scheme.
In addition, Canal supports a cluster configuration with a console. In the actual project, we configure MQ mode with RocketMQ or Kafka/ Redis. Canal sends data to MQ topics, which are then processed by the consumers of message queues, which we will learn more about later.