The first step of big data analysis in the existing architecture is how to change data from relational database to non-relational database. There are many solutions on the Internet, and we also experienced a lot of exploration and practice of three solutions, and finally used Canal. This is an article written by Zhang Tongrui, one of our colleagues in the big data department. I would like to share it with you. If you are interested, you can introduce it further later.
Demand background
In recent years, the concept of microservices continues to be hot, and there are more and more discussions on the network about microservices and individual architectures. Facing the growing business needs, many companies give priority to the microservices when upgrading their technical architectures. My company also chose this direction to upgrade its technical architecture to support more traffic and more convenient business expansion.
Found the problem
Microservices are split in two ways: Service systems are split instead of databases, and service systems are split into libraries. If the data size is small, it is unnecessary to split the database, because the split data will face multidimensional data query, cross-process transactions and other problems. However, with the development of business, the single database instance of my company can no longer meet the business needs, so I choose to split the business system and split the database at the same time, so I also face the above problems. This paper mainly introduces the real-time query solution of multi-dimensional data. The current system architecture and storage structure are as follows:
solution
-
To query multiple database data, you first need to synchronize the databases together to facilitate the query
-
In order to meet the requirements of large data volume, NOSQL database is preferred as the synchronization database
-
NOSQL databases cannot perform associative query, so you need to concatenate relational data and convert it to non-relational data
-
Multi-dimensional service query requires real-time performance, so MongoDB, a database with better real-time performance in NOSQL, should be selected
According to the above ideas, the data integration architecture is summarized as follows:
The solution
There are two types of data synchronization cases: MQ message synchronization and binlog data read synchronization
Let’s start with MQ message synchronization, which my company tried out for a while and found the following problems:
-
Data is carried out around the business and MQ messages are sent for business-critical data operations, which are highly dependent on the business system
-
The stock data in the database needs to be processed separately
-
The tool table also requires separate maintenance synchronization
-
MQ logic needs to be re-added each time a new table is added
Considering the above problems, synchronizing data using MQ is the optimal solution
At present, there are some mature schemes using binlog data reading method, such as the Tungsten Replicator, but these synchronization tools can only achieve 1:1 data replication. It is difficult to add customized logic during the data replication process, and the data collection operation of different databases and different tables is not supported. In summary, the optimal solution is to read the post-binlog and process the subsequent data logic by itself. At present, the most mature solution of binlog reading tool should be Canal of Alibaba open source.
canal
Canal is the incremental subscription & consumption component of Alibaba mysql database Binlog. Ali Cloud DRDS, Alibaba TDDL secondary index, small table replication. It’s all based on Canal, and it’s widely used. Canal’s principle is relatively simple:
-
Canal emulated the interaction protocol of the mysql slave, disguised itself as the mysql slave, and sent the dump protocol to the mysql master
-
Mysql master receives dump request and starts pushing binary log to slave(canal)
-
Canal parses binary log objects (originally byte streams)
The canal is introduced: https://github.com/alibaba/canal/wiki
I use canal HA mode, and zooKeeper elects available instances, one instance for each database, and the server configuration is as follows:
Directory:
conf
database1
-instance.properties
database2
-instance.properties
canal.properties
Copy the code
instance.properties
canal.instance.mysql.slaveId = 1001 canal.instance.master.address = X.X.X.X:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 canal.instance.filter.regex = .*\\.. * canal.instance.filter.black.regex =
Copy the code
canal.properties
canal.id= 1
canal.ip=X.X.X.X
canal.port= 11111
canal.zkServers=X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181
canal.zookeeper.flush.period = 1000
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024 ...
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
Copy the code
The deployment data flow is as follows:
Tip: Although Canal supports both mixed and row type binlog logs, if the mixed type of log is used to obtain row data, the table name cannot be obtained. Therefore, this solution only supports row type binlog
Data synchronization
Create a Canal Client application to subscribe to the binlog data read by Canal
1. Enable multiple instance subscription and subscribe to multiple instances
public void initCanalStart() { List<String> destinations = canalProperties.getDestination(); final List<CanalClient> canalClientList = new ArrayList<>(); if (destinations ! = null && destinations.size() > 0) {
for (String destination : Destinations) {// Dynamically fetch canal Server address based on ZooKeeper, set up a link, one of the servers crash, Can support failover CanalConnector connector = CanalConnectors. NewClusterConnector (canalProperties. GetZkServers (), destination, "", ""); CanalClient client = new CanalClient(destination, connector); canalClientList.add(client); client.start(); } } Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { logger.info("## stop the canal client"); for (CanalClient canalClient : canalClientList) { canalClient.stop(); } } catch (Throwable e) { logger.warn("##something goes wrong when stopping canal:", e); } finally { logger.info("## canal client is down."); }}}); }
Copy the code
Subscription message processing
private void process() { int batchSize = 5 * 1024; while (running) { try { MDC.put("destination", destination); connector.connect(); connector.subscribe(); while (running) { Message message = connector.getWithoutAck(batchSize); Long batchId = message.getid (); int size = message.getEntries().size(); if (batchId ! = -1 && size > 0) { saveEntry(message.getEntries()); }
connector.ack(batchId); // Submit confirmation // connector.rollback(batchId); }} Catch (Exception e) {logger.error("process error!" , e); } finally { connector.disconnect(); MDC.remove("destination"); }}}
Copy the code
According to the database event processing message, filter the message list and process data changes, using the following information:
-
insert :schemaName,tableName,beforeColumnsList
-
update :schemaName,tableName,afterColumnsList
-
delete :schemaName,tableName,afterColumnsList
RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("parse event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); logger.info(row_format, entry.getHeader().getLogfileName(), String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType, String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime)); if (eventType == EventType.QUERY || rowChage.getIsDdl()) { logger.info(" sql ----> " + rowChage.getSql()); continue; }
DataService dataService = SpringUtil.getBean(DataService.class); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { dataService.delete(rowData.getBeforeColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName()); } else if (eventType == EventType.INSERT) { dataService.insert(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName()); } else if (eventType == EventType.UPDATE) { dataService.update(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName()); } else {logger.info(" Unknown datatype: {}", eventType); }}}
Copy the code
ColumnsList convert to MongoTemplate data class: DBObject, with data type conversion
public static DBObject columnToJson(List<CanalEntry.Column> columns) { DBObject obj = new BasicDBObject(); try { for (CanalEntry.Column column : columns) { String mysqlType = column.getMysqlType(); Long if (mysqltype.startswith ("int")) {int lenBegin = mysqlType.indexof ('('); int lenEnd = mysqlType.indexOf(')'); if (lenBegin > 0 && lenEnd > 0) { int length = Integer.parseInt(mysqlType.substring(lenBegin + 1, lenEnd)); if (length > 10) { obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue())); continue; }}
obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Integer.parseInt(column.getValue())); } else if (mysqlType.startsWith("bigint")) { obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue())); } else if (mysqlType.startsWith("decimal")) { int lenBegin = mysqlType.indexOf('('); int lenCenter = mysqlType.indexOf(','); int lenEnd = mysqlType.indexOf(')'); if (lenBegin > 0 && lenEnd > 0 && lenCenter > 0) { int length = Integer.parseInt(mysqlType.substring(lenCenter + 1, lenEnd)); if (length == 0) { obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue())); continue; }}
obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Double.parseDouble(column.getValue())); } else if (mysqlType.equals("datetime") || mysqlType.equals("timestamp")) { obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_TIME_FORMAT.parse(column.getValue())); } else if (mysqlType.equals("date")) { obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_FORMAT.parse(column.getValue())); } else if (mysqlType.equals("time")) { obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : TIME_FORMAT.parse(column.getValue())); } else { obj.put(column.getName(), column.getValue()); } } } catch (ParseException e) { e.printStackTrace(); }
return obj; }
Copy the code
Tip: If the DBObject object is used to hold both raw data and composite data or other data, it should be used to make a deep copy of the object and then use the copy
Data splicing
We will concatenate the database data, such as two user tables:
user_info:{id,user_no,user_name,user_password}
user_other_info:{id,user_no,idcard,realname}
Copy the code
Mongo data after splicing are as follows:
user:{_id,user_no,userInfo:
{id,user_no,user_name,user_password},userOtherInfo:{id,user_no,idcard,realname})
Copy the code
Received a lot of data information, how to simply trigger the data splicing operation?
Look at the information we can obtain: schemaName, tableName, DBObject, Event (insert, update, delete)
Identify the information together and see: / schemaName/tableName/Event (DBObject), that’s right, is a standard restful links. As long as we implement a simple springMVC, we can automatically obtain the required data information for the concatenation operation.
Implement @controller, name it Schema, and value corresponds to schemaName
@Target({ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented @Component public @interface Schema { String value() default ""; }
Copy the code
Then implement @requestMapping, define the name as Table, and directly use the EventType in Canal to correspond to the RequestMethod
@Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Table { String value() default ""; CanalEntry.EventType[] event() default {}; }
Copy the code
Then create springUtil, realize ApplicationContextAware interface, application start loading initialization two Map: intanceMap, handlerMap
@Override public void setApplicationContext(ApplicationContext applicationContext) { if (SpringUtil.applicationContext == null) { SpringUtil.applicationContext = applicationContext; // Initialize instanceMap data instanceMap(); // Initialize handlerMap data handlerMap(); } }private void instanceMap() { Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Schema.class); for (Object bean : beans.values()) { Class<? > clazz = bean.getClass(); Object instance = applicationContext.getBean(clazz); Schema schema = clazz.getAnnotation(Schema.class); String key = schema.value(); instanceMap.put(key, instance); logger.info("instanceMap [{}:{}]", key, bean == null ? "null" : clazz.getName()); } }private void handlerMap(){ ... }
Copy the code
Call method:
public static void doEvent(String path, DBObject obj) throws Exception { String[] pathArray = path.split("/"); if (pathArray.length ! Logger. info("path format incorrect: {}", path); return; } Method method = handlerMap.get(path); Object schema = instanceMap.get(pathArray[1]); / / not find mapping Bean and Method do not handle the if (Method = = null | | schema = = null) {return; } try { long begin = System.currentTimeMillis(); Logger. Info (" Integrate Data: {}, {}", path, obj); method.invoke(schema, new Object[]{obj}); Logger. info(" Integrate Data consume: {}ms: ", system.currentTimemillis () -begin); } catch (Exception e) {logger.error(" call combination logic Exception ", e); throw new Exception(e.getCause()); }}
Copy the code
Data stitching message processing:
@Schema("demo_user")
public class UserService { @Table(value = "user_info", event = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE}) public void saveUser_UserInfo(DBObject userInfo) { String userNo = userInfo.get("user_no") == null ? null : userInfo.get("user_no").toString(); DBCollection collection = completeMongoTemplate.getCollection("user"); DBObject queryObject = new BasicDBObject("user_no", userNo); DBObject user = collection.findOne(queryObject); if (user == null) { user = new BasicDBObject(); user.put("user_no", userNo); user.put("userInfo", userInfo); collection.insert(user); } else { DBObject updateObj = new BasicDBObject("userInfo", userInfo); DBObject update = new BasicDBObject("$set", updateObj); collection.update(queryObject, update); }}}
Copy the code
The sample source code
https://github.com/zhangtr/canal-mongo
The original reference: http://www.torry.top/2017/10/22/canal-mongodb/
Recommended reading
-
That Microservice thing
-
The theoretical basis of microservices architecture – Conway’s Law
-
What is Spring Cloud doing from an architectural evolution perspective?