This article is from netease Cloud community

Author: Wang Sheng


Related Concepts

To understand the sharding problem for Kylin storage and queries, we need to introduce two important concepts: segment and cuboid. I believe that students in the big data industry are familiar with it. Kylin generates a new segment every time he submits a new build task, whereas users typically build once a day. In this case, a new segment is generated every day to store yesterday’s data. The core concept of Kylin is pre-aggregation, which is to calculate user-defined dimension combinations and save them in HBase. This query can directly query the pre-calculated results, very fast. The combination of dimensions here is called cuboID. During Kylin construction, a lot of CuBOID data is generated (each CuBOID corresponds to a combination of dimensions), which is eventually stored in HBase in the form of Hfiles. Kylin has a unique ID for each cuboID (all segments of a cube have the same CuboID and CuboID ID). This ID is determined by the order of the dimension columns when the user defines the Cube. Here’s a simple example. If there are three columns ABC in the table, then all combinations of cuboids are:

cuboid cuboid_id
ABC 7 (111).
AB 6 (110).
BC 5 (101).
AC 4 (100).
A 3 (011).
B 2 (010).
C 1 (001).


Where, the order of cube dimension columns is A, B and C, and the binary corresponding to ID is in parentheses, which can be sorted by users when constructing cube. When the final data is stored in HBase, the RowKey combines these dimension values in this order (RowKey also contains other members, which are not expanded here). It is recommended to put the dimensions that are frequently used by users or have a large base in front of the query to improve scanning efficiency.


Fragmentation problem

During Kylin’s build process, the data of each CuBOID is divided into several fragments (the fragments correspond to regions in HBase). CuboidShardNums and totalShards members are saved for each segment. As follows:

// Key indicates the ID of the cuboID, and value indicates the number of regions occupied by the cuboID. Private Map<Long, Short> cuboidShardNums = maps.newhashmap (); Private int totalShards = 0; private int totalShards = 0;Copy the code

Note that a region may store multiple CuBOids, so there is a many-to-many relationship between cuBOids and regions. Kylin can use the following configuration items to control region information generated during build generation:

/ / the size of a single region kylin. Storage. Hbase. The region - the cut - gb / / region minimum quantity kylin. Storage. Hbase. Min - region - count / / region's largest quantity kylin. Storage. hbase.max-region-countCopy the code

With the above three configuration items, you can control the number and size of regions generated during each build, and optimize them accordingly. Segment fragmentation information is also affected by these parameters. Details are as follows:

floatcut = cubeDesc.getConfig().getKylinHBaseRegionCut(); int nRegion = Math.round((float) (totalSizeInM / (cut * 1024L)));
nRegion = Math.max(kylinConfig.getHBaseRegionCountMin(), nRegion); nRegion = Math.min(kylinConfig.getHBaseRegionCountMax(), nRegion); // Omit the rest of the codeCopy the code

Among them, the cut is through kylin. Storage. Hbase. The region – the cut – gb to set the region segmentation threshold, totalSizeInM is generated in the process of the build the size of the data (the sum of all the cuboid data), In this way, we can calculate the totalShards size corresponding to each segment, i.e., nRegion. The number of fragments occupied by each cuBOID can be calculated by the following code:

int mbPerRegion = (int) (totalSizeInM / nRegion);for (long cuboidId : allCuboids) {  double estimatedSize = cubeSizeMap.get(cuboidId);  double magic = 23;  int shardNum = (int) (estimatedSize * magic / mbPerRegion + 1);  if(shardNum < 1) { shardNum = 1; } // omit the rest of the code}Copy the code

Firstly, calculate the actual size of each region mbPerRegion, and then calculate the number of regions occupied by each cuboid according to the data size of each cuboid estimatedSize, that is, shardNum. Here, a magic is used to disperse cuBOID data into multiple regions as much as possible, so that multiple regions can be scanned in parallel during query to improve query efficiency. Once cuboidShardNums and totalShards are settled, we also need to determine the starting region for storing data for each CuBOID (then we can use the region number shardNum to determine the location of all data distribution for the given CuBOID). Obtain the region ID of each cuBOID based on the cuboID id and the total number of regions. If you are interested, check the source (shardingHash.java).

short startShard = ShardingHash.getShard(cuboidId, nRegion);Copy the code

The Segment uses the cuboidBaseShards member to hold the mapping between the cuboID ID and the starting region ID, as shown below:

private Map<Long, Short> cuboidBaseShards = Maps.newConcurrentMap();Copy the code

In this way, the segment fragmentation problem is basically solved in Kylin build process.

Querying the Fragmentation problem

When a new segment is generated, we can query the data in it. From the above analysis, we know that the construction result of each segment is actually a data set of multiple CuBOids. So, when we do the query, Kylin will get the cuBoids that best match according to the columns in the SQL (in the case of join, there may be multiple cuBoids that match). Then scan the segment according to the cuboID ID. Kylin generates a CubeSegmentScanner for each segment to be scanned. When scanning each segment, obtain region information (including the start region ID and number of regions) based on the cuboID id. The main processing logic is as follows:

Private List<Pair<byte[], byte[]>> getEPKeyRanges(short baseShard, short shardNum, int totalShards) {if (shardNum == 0) {    return Lists.newArrayList();
  }  if(shardNum == totalShards) {// The data of this cuBOID is distributed in all regionsreturn Lists.newArrayList(Pair.newPair(getByteArrayForShort((short) 0),
               getByteArrayForShort((short) (shardNum - 1))));
  } else if(baseShard + shardNum <= totalShards) {// The data of this cuboID is distributed in regions with consecutive idsreturn Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard),   
               getByteArrayForShort((short) (baseShard + shardNum - 1))));
  } else{//0,1,2,3,4 are stored in the case of 4,0return Lists.newArrayList(Pair.newPair(getByteArrayForShort(baseShard),    
               getByteArrayForShort((short) (totalShards - 1))), 
               Pair.newPair(getByteArrayForShort((short) 0), 
               getByteArrayForShort((short) (baseShard + shardNum - totalShards - 1))));
  }
}private byte[] getByteArrayForShort(short v) {  byte[] split = new byte[Bytes.SIZEOF_SHORT];
  BytesUtil.writeUnsigned(v, split, 0, Bytes.SIZEOF_SHORT);  return split;
}Copy the code

In this way, the region to be scanned for each segment can be obtained. The data of Kylin is stored in HBase, so the scanning process is carried out in HBase. For each region, Kylin starts a thread to send scan requests to HBase, and then returns all scan results, which are aggregated and then returned to the upper layer. To speed up scan efficiency, Kylin also uses the HBase CoProcessor to pre-aggregate scan results of each region. Knowledge about coprocessor is no longer here, may refer to the source code (CubeHBaseEndpointRPC. Java and CubeVisitService. Java). Up to now, the sharding problem about Kylin storage and query is almost sorted out. This paper omitted some details about Kylin’s HBase storage, which will be added in the future. Interested students can exchange and learn together.


Netease Cloud Free experience pavilion, 0 cost experience 20+ cloud products!

For more information about NETEASE’s r&d, product and operation experience, please visit netease Cloud Community.


Related articles: a guide to preventing Repackaging of Android Apps