The introduction

It is a platitude for e-commerce APP to count order revenue. The general demand is user revenue daily/monthly/annual report. These report type data to the table design and program design have no small challenge. The query time of a regular aggregate query statement becomes longer as the income statement data grows larger. At this time we need to think about how to design income statement can be more efficient query? What design makes it easy to count revenue?

demand

rendering


The specific requirements

  • Revenue types are divided into: self-purchase order revenue, share order revenue, distribution revenue, activity revenue
  • Calculate the income of the day and the income of the month
  • The revenue of the time segment is calculated based on the selected time segment.

thinking

Design ideas

An order sheet is definitely needed. Write changes to the revenue table simultaneously while writing or modifying the order table. Only self-purchase and share orders are recorded in the order sheet, distribution and event giveaway revenues are only recorded in the income sheet for special transactions. Then create a daily user revenue report based on the daily dimension. A single line record is written to the user’s revenue on that day. Reduces the amount of data used to query the daily, monthly, and annual revenue statistics of users. Taking a single user as an example, splitting users will only generate a maximum of 31 pieces of data in a month. It is a controllable growth rate. If you use the income table, because the amount of data in the income table corresponds to the number of orders placed by the user, if the user orders a lot, the table will be very large. In the early period when the number of users initially increases, this method can be used to avoid large data volume statistics. In the later period, if the increase in the number of users leads to more data in daily reports, sub-tables can be considered again.

Visible problem

  • The timing of the synchronous income daily report is a problem, because the operation of the original order is very complex and needs to write income and calculation to write income daily data, the coupling degree of the code is too high. Is there any way to make a daily income report from the income statement?
  • Although the revenue is written into the daily report, but to meet the effect of the effect diagram requirements, may need to query the SQL statement several times, is there a way to minimize the efficiency of the program some aggregated SQL?

implementation

Sum up the above questions. I started collecting data. Finally, Canal +RocketMQ was adopted as the heterogeneous scheme.

Technology stack

A brief introduction to these two technical frameworks:

  • Canal: The main purpose is to provide incremental data subscription and consumption based on MySQL database incremental log parsing
  • RocketMQ: An open source distributed messaging system that provides low-latency, highly reliable message publishing and subscription services based on highly available distributed clustering technology.

Note: I use aliyun's whole family bucket,MQ and mysql are ali cloud, if it is a self-built server may have a difference, I try to mark in the later

Solution process


  1. Monitor the binlog log of the mysql income table through Canal while writing or modifying the income table.
  2. Canal detects the change, assembles the JSON message of the change, and sends the TOPIC defined in RocketMQ.
  3. Applications consume the TOPIC, heterogeneous revenue daily report.

Canal configuration

Please refer to the official canal installation documentation to extract a canal folder containing three directories

  • Bin: stores the startup and restart scripts
  • Conf: stores the core configuration file
  • Lib: stores the core JAR package

We need to focus on the conf folder conf/canal. The properties of the core configuration file and the conf/example/instance, the properties of a single monitoring node configuration file

conf/canal.properties

TCP, kafka, RocketMQ, TCP read mode by default, RocketMQ is changed to RocketMQ mode
canal.serverMode = RocketMQ
Aliyun RocketMQ uses two keys: AK/SK
canal.aliyun.accessKey =xxxxxxx
canal.aliyun.secretKey =xxxxxxx
Name of the node to be monitored. The default is example. If there are multiple nodes, they can be separated by commas, as in the following example canal.destinations = example,sign # If aliyun's RocketMQ needs to change canal.mq. AccessChannel to cloud, default is local canal.mq.accessChannel = cloud Note that http://, is not used but the port number is required canal.mq.servers = # rocketmq instance id canal.mq.namespace = Copy the code

conf/example/instance.properties

# mysql address
canal.instance.master.address=
Mysql > select 'show master' from 'binlog' Status `, canal. The instance. The master. The journal, the corresponding File name parameter, canal. The instance. The master. The position corresponding to the position parameters
canal.instance.master.journal.name=
canal.instance.master.position=
# database account password canal.instance.dbUsername= canal.instance.dbPassword= Table changes need to be monitored canal.instance.filter.regex=xxx.t_user_order,xxx.t_user_cash_out Define the MQ production group to send canal.mq.producerGroup = Define the specified topic to send to MQ canal.mq.topic= Copy the code

Note: The writing rules of the monitoring table refer to the writing rules of the monitoring table

Start the

cd /canal/bin
./start.sh
Copy the code

The canal main log file and the example node startup log are displayed.

Appears in the Canal log the canal server is running now ......
Example appears in the log init table filter : ^tablename
 xxxxxxxxx , the next step is binlog dump
Copy the code

That means you’ve made a big step. Canal monitoring is up and running.

RocketMQ part

If aliyun’s RocketMQ is used, the configuration code can be directly referred to the documentation. Self-built RocketMQ can also refer to the simple consumption example to monitor the corresponding TOPIC to consume the data sent by Canal, in the following format:

{
    "data": [        {
// If there are multiple table changes at the same time, there will be multiple JSON objects}]. "database":"Database of monitored tables". "es": table change time, "id": id generated by canal, "isDdl":Boolean type, indicating whether a DDL statement, "mysqlType": {Table structure },  "old"If it is a modified type, the value before the modification will be filled in. "pkNames": [Primary key of the table, as shown in"id" ]. "sql":"SQL executed". "sqlType": {The sqlType corresponding to the field is mysqlType },  "table":"Monitor table name". "ts":canal Records the sending time, "type":INSERT,UPDATE,DELETE table change type } Copy the code

MQ consumption code mainly uses reflection, which maps to the corresponding tables

// The body is the data sent by Canalpublic Action process(String body) {
        boolean result = Boolean.FALSE;
        JSONObject data = JSONObject.parseObject(body);
        log.info("Database operation log: data:{}",data.toString());
 Class c = null;  try { // Here we monitor the order and income statement to make order statistics and income daily statistics c = Class.forName(getClassName(data.getString("table")));  } catch (ClassNotFoundException e) {  log.error("error {}",e);  }  if(null ! = c) { JSONArray dataArray = data.getJSONArray("data");  if(dataArray ! = null) {// Convert the retrieved data portion into a reflected entity collection List list = dataArray.toJavaList(c);  if (CollUtil.isNotEmpty(list)) { // Perform logical operations on the modify and write operations separately String type = data.getString("type");  if ("UPDATE".equals(type)) {  result = uppHistory(list);  } else if ("INSERT".equals(type)) {  result = saveHistory(list);  }  }  }  }  return result ? Action.CommitMessage : Action.ReconsumeLater;  }  / * ** @description: Get reflection ClassName * @author: chenyunxuan * / private String getClassName(String tableName) {  StringBuilder sb = new StringBuilder(); // Determine which table is the data if (tableName.equals("t_user_income_detail")) {  sb.append("cn.mc.core.model.order");  } else if (tableName.equals("t_user_cash_out")) {  sb.append("cn.mc.sync.model");  }  String className = StrUtil.toCamelCase(tableName).substring(1);  return sb.append(".").append(className).toString();  }  / * ** @description: Writes the corresponding type of statistics table * @author: chenyunxuan * / private <T> Boolean saveHistory(List<T> orderList) {  boolean result = Boolean.FALSE;  Object dataType = orderList.get(0); // Use instanceof to determine the type to enter the different logic processing code if (dataType instanceof TUserIncomeDetail) {  result = userOrderHistoryService.saveIncomeDaily(orderList);  } else if (dataType instanceof UserCashOut) {  result = userCashOutHistoryService.delSaveHistoryList(orderList);  }  return result;  } Copy the code

SaveIncomeDaily pseudo code

  public synchronized Boolean saveIncomeDaily(List orderList) {
// Recycle revenue details.// Query whether the daily revenue report contains data of the current day based on the creation time and user ID    if(Data of the day does not exist){// Create a daily revenue report for that day. } // Empty data of the current day will be written immediately even if there is no current record, so the following processes are all update processes// Update current data. return Boolean.TRUE;  } Copy the code

Note: more logs should be typed in the code to facilitate the proofreading of abnormal income data

Afterword.

This completes a heterogeneous scheme for daily revenue statistics based on Canal +RocketMQ, and the next article will focus on the second problem mentioned in this article to reduce the generation of aggregated SQL. Stay tuned.