The premise

After installing MySQL (I installed 5.7), install JDK (canal dependency)

Enable MySQL binlog

To retrieve the binary contents of CURD, turn on binlog and change the format of binlog to Row. CNF: /etc/my.cnf: / mysqld: /etc/my.cnf: / mysqld: / mysqld

Log-bin =mysql-bin # select ROW mode server_id=1 #Copy the code

Verify that binlog is enabled

Log in to MySQL and run the following command:

Show variables like 'log_%';Copy the code

If log_bin is on, the function is enabled.

Assign a MySQL account to Canal

Give Canal a MySQL account so that Canal can steal MySQL’s binlog.

CREATE USER canal IDENTIFIED BY 'canal';
GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
Copy the code

Check whether permissions are assigned to the Canal account

show grants for 'canal' 
Copy the code

Download from canal

Address: github.com/alibaba/can… , the current stable version is V1.1.0, download ccanal. Deployer-1.1.0.tar.gz. Unzip to the canal directory (new if you don’t have it)

1.8.0_65-b17 = 1.8.0_65-b17 = 1.8.0_65-b17

#downloadWget HTTP: / / https://github.com/alibaba/canal/releases/download/canal-1.1.0/canal.deployer-1.1.0.tar.gz#Create the canal directory
mkdir canal
#Unpack theThe tar ZXVF - https://github.com/alibaba/canal/releases/download/canal-1.1.0/canal.deployer-1.1.0.tar.gzCopy the code

The canal and instance profiles

A canal may have multiple instances, which means that one instance can monitor one mysql instance, and multiple instances can correspond to multiple mysql instances on multiple servers. In other words, a single CANAL can monitor multiple MySQL machines under sub-database sub-table.

(1) canal. The properties

The canal.properties file in canal/config is the global configuration of the CANAL server and has the following modifications:

################################################# ######### common argument ############# # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # # id only. Canal. id= 2 canal. IP = canal.port=11111 canal.metrics.pull.port=11112 canal.zkServers= # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE ## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false # parallel parser config canal.instance.parser.parallel = true ## concurrent thread number, default 60% available processors, To exceed runtime.getruntime (). AvailableProcessors () # parallelThreadSize = 1 to 14 1 CPU allocated, causing an error. Instead of 1 canal. The instance. The parser. ParallelThreadSize = 1 # # disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256 # table meta tsdb info canal.instance.tsdb.enable=true  canal.instance.tsdb.dir=${canal.file.data.dir:.. /conf}/${canal.instance.destination:} canal.instance.tsdb.url=jdbc:h2:${canal.instance.tsdb.dir}/h2; CACHE_SIZE=1000; MODE=MYSQL; canal.instance.tsdb.dbUsername=canal canal.instance.tsdb.dbPassword=canal # rds oss binlog account canal.instance.rds.accesskey = canal.instance.rds.secretkey = ################################################# ######### destinations ############# ################################################# canal.destinations= example # conf root dir canal.conf.dir = .. /conf # auto scan instance dir add/remove and start/stop instance canal.auto.scan = true canal.auto.scan.interval = 5 canal.instance.tsdb.spring.xml=classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml=classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = spring Canal. Instance. Global. Lazy = false # canal. Instance. Global. Manager. Address = 127.0.0.1:1099 #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml # position The info, Need to own the database information canal. Instance. Master. Address = 127.0.0.1:3306 canal. The instance. The master. The journal. The name = Canal. The instance. The master. The position = canal. The instance. The master. The timestamp = # username/password, Need to own the database information canal. Instance. DbUsername = canal canal. The instance. The dbPassword = canal canal. Instance. DefaultDatabaseName = test canal.instance.connectionCharset = UTF-8 # table regex canal.instance.filter.regex = .*\\.. *Copy the code
(2) the instance. The properties

Located in canal/example/instance properties, is a specific instances of instance configuration, not involved in the configuration will be from the canal. The properties on the inheritance, content is as follows:

################################################# ## mysql serverId , V1.0.26 + will autoGen # canal. The instance. The mysql. SlaveId = 0 # enable gtid use true/false canal. The instance. The gtidon # = false Position info address change for your mysql address canal. The instance. The master. The address = 192.168.204.128:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true # canal. The instance. The TSDB. Url = JDBC: mysql: / / 127.0.0.1: # 3306 / canal_tsdb canal. The instance. The TSDB. DbUsername = canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password Amended as in mysql to account password canal. The canal synchronous data instance. The dbUsername = canal canal. Instance. DbPassword = # canal monitoring database canal.instance.defaultDatabaseName=test canal.instance.connectionCharset=UTF-8 # table regex canal.instance.filter.regex=.*\\.. * # table black regex canal.instance.filter.black.regex= #################################################Copy the code

Create test database

Check whether the test database exists on the MySQL database

Open canal

Go to canal/bin and run./startup.sh.

Use the ps – ef | grep canal to investigate whether open.

The Java client code

Create a SpringBoot project and import dependencies:

<dependency>
     <groupId>com.alibaba.otter</groupId>
     <artifactId>canal.client</artifactId>
     <version>1.1.0</version>
 </dependency>
Copy the code

Create TestCanal class:

package com.xbq.canal.test;

import java.awt.Event;
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.Header;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.Message;
import com.google.protobuf.InvalidProtocolBufferException;

/ * * *@Auther: xbq
 * @Date: 2018/9/11 he *@Description: * /
public class TestCanal {

    public static void main(String[] args)  throws InterruptedException {
        // Step 1: Connect to canal
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.204.128".11111),
                "example".""."");
        connector.connect();
        // Step 2: Enable subscription
        connector.subscribe();
        // Step 3: Repeat the subscription
        while (true) {
            try {
                // Read 1000 entries at a time
                Message message = connector.getWithoutAck(1000);
                long batchID = message.getId();
                int size = message.getEntries().size();
                if (batchID == -1 || size == 0) {
                    System.out.println("No data available at this time.");
                    Thread.sleep(1000);
                } else {
                    System.out.println("-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - to have the data -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --");
                    PrintEntry(message.getEntries());
                }
                // position id ack
                connector.ack(batchID);
            } catch (Exception e) {
                // TODO: handle exception
            } finally {
                Thread.sleep(1000); }}}/** * Get each printed record *@param entrys
     */
    public static void PrintEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            // Step 1: Disassemble the Entry entity
            Header header = entry.getHeader();
            EntryType entryType = entry.getEntryType();

            // Step 2: If the current is RowData, that is the data I need
            if (entryType == EntryType.ROWDATA) {
                String tableName = header.getTableName();
                String schemaName = header.getSchemaName();
                RowChange rowChange = null;
                try {
                    rowChange = RowChange.parseFrom(entry.getStoreValue());
                } catch (InvalidProtocolBufferException e) {
                    e.printStackTrace();
                }
                EventType eventType = rowChange.getEventType();
                System.out.println(String.format("%s.%s, Action= %s", schemaName, tableName, eventType));

                // If the operation is' query 'or' DDL ', then the SQL is typed directly
                if (eventType == EventType.QUERY || rowChange.getIsDdl()) {
                    System.out.println("rowchange sql ----->" + rowChange.getSql());
                    return;
                }
                // Step 3: trace to the columns level
                rowChange.getRowDatasList().forEach((rowData) -> {
                    // Get the column before the update
                    List<Column> beforeColumns = rowData.getBeforeColumnsList();
                    // Get the updated column
                    List<Column> afterColumns = rowData.getAfterColumnsList();
                    // The current operation is delete
                    if (eventType == EventType.DELETE) {
                        PrintColumn(beforeColumns);
                    }
                    // This is an insert operation
                    if (eventType == EventType.INSERT) {
                        PrintColumn(afterColumns);
                    }
                    // The update operation is currently performed
                    if(eventType == EventType.UPDATE) { PrintColumn(afterColumns); }}); }}}/** * Changes to each column on each row *@param columns
     */
    public static void PrintColumn(List<Column> columns) {
        columns.forEach((column) -> {
            String columnName = column.getName();
            String columnValue = column.getValue();
            String columnType = column.getMysqlType();
            // Check whether the field is updated
            boolean isUpdated = column.getUpdated();
            System.out.println(String.format("columnName=%s, columnValue=%s, columnType=%s, isUpdated=%s", columnName, columnValue, columnType, isUpdated)); }); }}Copy the code

Run this class. MySQL > alter table student; alter table student; alter table student;

reference

Cache consistency and data heterogeneity solution for cross-server queries by Canal