Environment to prepare
1. Prepare the JDK8 environment and REDis environment
2. The mysql service
2.1 Build mysql by yourself
2.2 Adding a User Name
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@The '%';
FLUSH PRIVILEGES;
Copy the code
2.3 Checking whether binlog is enabled
show variables like 'log_bin';
2.4 check the binlog_format
show variables like 'binlog_format';
3. The canal
3.1 download the canal
https://github.com/alibaba/canal/releases
3.2 Modifying the Configuration
canal.instance.gtidon=false# # the position info mysql address port canal. The instance. The master. The address =127.0. 01.:3306# show master status; View the binlog file name canal. The instance. The master. Journal. Name = binlog000009.# implement binlog update the location of the canal. The instance. The master. The position =155Canal. The instance. The master. The timestamp = canal. The instance. The master. The gtid = # # username/password canal links database user name Canal. Instance. DbUsername = canal # # password canal. The canal links database instance. The dbPassword = canal canal. Instance. ConnectionCharset = UTF-8
canal.instance.defaultDatabaseName=test
# table regex
canal.instance.filter.regex=.*\\..
Copy the code
3.2.1 canal. The instance. The master. The journal. The name and canal. The instance. The master. The position search way
- Enter the database.
show master status;
3.3 start the canal
Start canal, /bin/startup.bat (Windows) or /bin/startup.sh (Linux)
4. Code implementation
4.1 CanalClient
import java.net.InetSocketAddress;
import java.util.List;
import com.alibaba.fastjson.JSONObject;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;
import com.alibaba.otter.canal.client.*;
public class CanalClient {
public static void main(String args[]) {
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(), 11111), "example"."canal"."canal");
int batchSize = 1000;
try {
connector.connect();
connector.subscribe(". * \ \.. *");
connector.rollback();
while (true) {
Message message = connector.getWithoutAck(batchSize); // Get the specified amount of data
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
try {
Thread.sleep(1000);
} catch(InterruptedException e) { e.printStackTrace(); }}else {
processEntry(message.getEntries());
}
connector.ack(batchId); // Submit confirmation
// connector.rollback(batchId); // Data rollback failed}}catch (Exception e) {
throw e;
} finally {
connector.disconnect();
}
}
private static void processEntry(List<Entry> entrys) {
for (Entry entry : entrys) {
if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
continue;
}
RowChange rowChage = null;
try {
rowChage = RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e);
}
EventType eventType = rowChage.getEventType();
System.out.printf("================> binlog[%s:%s] , name[%s,%s] , eventType : %s%n",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType);
// Use database:table: as the key of redis
String key = entry.getHeader().getSchemaName() + ":" + entry.getHeader().getTableName() + ":";
for (RowData rowData : rowChage.getRowDatasList()) {
if (eventType == EventType.DELETE) {
redisDelete(rowData.getBeforeColumnsList(), key);
} else if (eventType == EventType.INSERT) {
redisInsert(rowData.getAfterColumnsList(), key);
} else {
System.out.println("-------> before");
printColumn(rowData.getBeforeColumnsList());
System.out.println("-------> after");
redisUpdate(rowData.getAfterColumnsList(), key);
}
}
}
}
private static void printColumn(List<Column> columns) {
for (Column column : columns) {
System.out.println(column.getName() + ":" + column.getValue() + " update=" + column.getUpdated());
}
}
private static void redisInsert(List<Column> columns, String key) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.set(key + columns.get(0).getValue(), json.toJSONString());
}
}
private static void redisUpdate(List<Column> columns, String key) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.set(key + columns.get(0).getValue(), json.toJSONString());
}
}
private static void redisDelete(List<Column> columns, String key) {
JSONObject json = new JSONObject();
for (Column column : columns) {
json.put(column.getName(), column.getValue());
}
if (columns.size() > 0) {
RedisUtil.delKey(key + columns.get(0).getValue()); }}}Copy the code
4.2 RedisUtil
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
public class RedisUtil {
// Redis server IP address
private static String ADDR = "127.0.0.1";
// Port number of Redis
private static int PORT = 6379;
// Access password
//private static String AUTH = "admin";
// Maximum number of available connection instances. Default is 8;
// If the value is -1, there is no limit; If a pool has already allocated maxActive jedis instances, the pool is in a exhausted state.
private static int MAX_ACTIVE = 1024;
// Controls the number of jedis instances in a pool that are idle. The default value is 8.
private static int MAX_IDLE = 200;
// The maximum time to wait for available connections, in milliseconds. The default value is -1, indicating that the connection will never timeout. If the wait time is exceeded, JedisConnectionException is thrown directly;
private static int MAX_WAIT = 10000;
// Expiration time
protected static int expireTime = 60 * 60 *24;
/ / the connection pool
protected static JedisPool pool;
/** * static code, only the first time to call */
static {
JedisPoolConfig config = new JedisPoolConfig();
// Maximum number of connections
config.setMaxTotal(MAX_ACTIVE);
// Maximum number of free instances
config.setMaxIdle(MAX_IDLE);
// The timeout period
config.setMaxWaitMillis(MAX_WAIT);
//
config.setTestOnBorrow(false);
pool = new JedisPool(config, ADDR, PORT, 1000);
}
/** * get jedis instance */
protected static synchronized Jedis getJedis() {
Jedis jedis = null;
try {
jedis = pool.getResource();
} catch (Exception e) {
e.printStackTrace();
if(jedis ! =null) { jedis.close(); }}return jedis;
}
/** * Whether key * exists@param key* /
public static boolean existKey(String key) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.select(0);
return jedis.exists(key);
} catch (Exception e) {
throw new RuntimeException("existKey error", e);
} finally {
if(jedis ! =null) { jedis.close(); }}}/** * delete key *@param key* /
public static void delKey(String key) {
Jedis jedis = null;
try {
jedis = getJedis();
jedis.select(0);
jedis.del(key);
} catch (Exception e) {
throw new RuntimeException("delKey error", e);
} finally {
if(jedis ! =null) { jedis.close(); }}}/** * get the value of key *@param key* /
public static String get(String key) {
Jedis jedis = null;
String lastVal = null;
try {
jedis = getJedis();
jedis.select(0);
lastVal = jedis.get(key);
jedis.expire(key, expireTime);
} catch (Exception e) {
throw new RuntimeException("stringGet error", e);
} finally {
if(jedis ! =null) { jedis.close(); }}return lastVal;
}
/** * Add string data *@param key
* @param value* /
public static String set(String key, String value) {
Jedis jedis = null;
String lastVal = null;
try {
jedis = getJedis();
jedis.select(0);
lastVal = jedis.set(key, value);
jedis.expire(key, expireTime);
} catch (Exception e) {
throw new RuntimeException("stringSet error", e);
} finally {
if(jedis ! =null) { jedis.close(); }}return lastVal;
}
/** * Add hash data *@param key
* @param field
* @param value* /
public static void hashSet(String key, String field, String value) {
Jedis jedis = null;
try {
jedis = getJedis();
if(jedis ! =null) {
jedis.select(0); jedis.hset(key, field, value); jedis.expire(key, expireTime); }}catch (Exception e) {
throw new RuntimeException("hashSet error", e);
} finally {
if(jedis ! =null) { jedis.close(); }}}}Copy the code
4.3 Synchronizing Data