Environment to prepare

1. Prepare the JDK8 environment and REDis environment

2. The mysql service

2.1 Build mysql by yourself

2.2 Adding a User Name

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@The '%';
FLUSH PRIVILEGES;
Copy the code

2.3 Checking whether binlog is enabled

show variables like 'log_bin';

2.4 check the binlog_format

show variables like 'binlog_format';

3. The canal

3.1 download the canal

https://github.com/alibaba/canal/releases

3.2 Modifying the Configuration

canal.instance.gtidon=false# # the position info mysql address port canal. The instance. The master. The address =127.0. 01.:3306# show master status; View the binlog file name canal. The instance. The master. Journal. Name = binlog000009.# implement binlog update the location of the canal. The instance. The master. The position =155Canal. The instance. The master. The timestamp = canal. The instance. The master. The gtid = # # username/password canal links database user name Canal. Instance. DbUsername = canal # # password canal. The canal links database instance. The dbPassword = canal canal. Instance. ConnectionCharset = UTF-8
canal.instance.defaultDatabaseName=test
# table regex
canal.instance.filter.regex=.*\\..
Copy the code

3.2.1 canal. The instance. The master. The journal. The name and canal. The instance. The master. The position search way

  • Enter the database.

show master status;

3.3 start the canal

Start canal, /bin/startup.bat (Windows) or /bin/startup.sh (Linux)

4. Code implementation

4.1 CanalClient

import java.net.InetSocketAddress;
import java.util.List;

import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;


public class CanalClient {
    public static void main(String args[]) {
        CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example"."canal"."canal");
        int batchSize = 1000;
        try {
            connector.connect();
            connector.subscribe(". * \ \.. *");
            connector.rollback();
            while (true) {
                Message message = connector.getWithoutAck(batchSize); // Get the specified amount of data
                long batchId = message.getId();
                int size = message.getEntries().size();
                if (batchId == -1 || size == 0) {
                    try {
                        Thread.sleep(1000);
                    } catch(InterruptedException e) { e.printStackTrace(); }}else {
                    processEntry(message.getEntries());
                }
                connector.ack(batchId); // Submit confirmation
                // connector.rollback(batchId); // Data rollback failed}}catch (Exception e) {
            throw e;
        } finally {
            connector.disconnect();
        }
    }

    private static void processEntry(List<Entry> entrys) {
        for (Entry entry : entrys) {
            if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                continue;
            }
            RowChange rowChage = null;
            try {
                rowChage = RowChange.parseFrom(entry.getStoreValue());
            } catch (Exception e) {
                throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
            }
            EventType eventType = rowChage.getEventType();
            System.out.printf("================> binlog[%s:%s] , name[%s,%s] , eventType : %s%n",
                    entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                    entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                    eventType);
            // Use database:table: as the key of redis
            String key = entry.getHeader().getSchemaName() + ":" + entry.getHeader().getTableName() + ":";
            for (RowData rowData : rowChage.getRowDatasList()) {
                if (eventType == EventType.DELETE) {
                    redisDelete(rowData.getBeforeColumnsList(), key);
                } else if (eventType == EventType.INSERT) {
                    redisInsert(rowData.getAfterColumnsList(), key);
                } else {
                    System.out.println("-------> before");
                    printColumn(rowData.getBeforeColumnsList());
                    System.out.println("-------> after");
                    redisUpdate(rowData.getAfterColumnsList(), key);
                }
            }
        }
    }

    private static void printColumn(List<Column> columns) {
        for (Column column : columns) {
            System.out.println(column.getName() + ":" + column.getValue() + " update=" + column.getUpdated());
        }
    }

    private static void redisInsert(List<Column> columns, String key) {
        JSONObject json = new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            RedisUtil.set(key + columns.get(0).getValue(), json.toJSONString());
        }
    }

    private static void redisUpdate(List<Column> columns, String key) {
        JSONObject json = new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            RedisUtil.set(key + columns.get(0).getValue(), json.toJSONString());
        }
    }

    private static void redisDelete(List<Column> columns, String key) {
        JSONObject json = new JSONObject();
        for (Column column : columns) {
            json.put(column.getName(), column.getValue());
        }
        if (columns.size() > 0) {
            RedisUtil.delKey(key + columns.get(0).getValue()); }}}Copy the code

4.2 RedisUtil

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;


public class RedisUtil {

    // Redis server IP address
    private static String ADDR = "127.0.0.1";

    // Port number of Redis
    private static int PORT = 6379;

    // Access password
    //private static String AUTH = "admin";

    // Maximum number of available connection instances. Default is 8;
    // If the value is -1, there is no limit; If a pool has already allocated maxActive jedis instances, the pool is in a exhausted state.
    private static int MAX_ACTIVE = 1024;

    // Controls the number of jedis instances in a pool that are idle. The default value is 8.
    private static int MAX_IDLE = 200;

    // The maximum time to wait for available connections, in milliseconds. The default value is -1, indicating that the connection will never timeout. If the wait time is exceeded, JedisConnectionException is thrown directly;
    private static int MAX_WAIT = 10000;

    // Expiration time
    protected static int  expireTime = 60 * 60 *24;

    / / the connection pool
    protected static JedisPool pool;

    /** * static code, only the first time to call */
    static {
        JedisPoolConfig config = new JedisPoolConfig();
        // Maximum number of connections
        config.setMaxTotal(MAX_ACTIVE);
        // Maximum number of free instances
        config.setMaxIdle(MAX_IDLE);
        // The timeout period
        config.setMaxWaitMillis(MAX_WAIT);
        //
        config.setTestOnBorrow(false);
        pool = new JedisPool(config, ADDR, PORT, 1000);
    }

    /** * get jedis instance */
    protected static synchronized Jedis getJedis() {
        Jedis jedis = null;
        try {
            jedis = pool.getResource();
        } catch (Exception e) {
            e.printStackTrace();
            if(jedis ! =null) { jedis.close(); }}return jedis;
    }

    /** * Whether key * exists@param key* /
    public static boolean existKey(String key) {
        Jedis jedis = null;
        try {
            jedis = getJedis();
            jedis.select(0);
            return jedis.exists(key);
        } catch (Exception e) {
            throw new RuntimeException("existKey error", e);
        } finally {
            if(jedis ! =null) { jedis.close(); }}}/** * delete key *@param key* /
    public static void delKey(String key) {
        Jedis jedis = null;
        try {
            jedis = getJedis();
            jedis.select(0);
            jedis.del(key);
        } catch (Exception e) {
            throw new RuntimeException("delKey error", e);
        } finally {
            if(jedis ! =null) { jedis.close(); }}}/** * get the value of key *@param key* /
    public static String get(String key) {
        Jedis jedis = null;
        String lastVal = null;
        try {
            jedis = getJedis();
            jedis.select(0);
            lastVal = jedis.get(key);
            jedis.expire(key, expireTime);
        } catch (Exception e) {
            throw new RuntimeException("stringGet error", e);
        } finally {
            if(jedis ! =null) { jedis.close(); }}return lastVal;
    }

    /** * Add string data *@param key
     * @param value* /
    public static String set(String key, String value) {
        Jedis jedis = null;
        String lastVal = null;
        try {
            jedis = getJedis();
            jedis.select(0);
            lastVal = jedis.set(key, value);
            jedis.expire(key, expireTime);
        } catch (Exception e) {
            throw new RuntimeException("stringSet error", e);
        } finally {
            if(jedis ! =null) { jedis.close(); }}return lastVal;
    }

    /** * Add hash data *@param key
     * @param field
     * @param value* /
    public static void hashSet(String key, String field, String value) {
        Jedis jedis = null;
        try {
            jedis = getJedis();
            if(jedis ! =null) {
                jedis.select(0); jedis.hset(key, field, value); jedis.expire(key, expireTime); }}catch (Exception e) {
            throw new RuntimeException("hashSet error", e);
        } finally {
            if(jedis ! =null) { jedis.close(); }}}}Copy the code

4.3 Synchronizing Data