Author: Kaisen Kang

About the author: Meituan big Data Engineer, Apache Kylin Committer, currently responsible for the platform construction of Meituan OLAP system (Kylin & Druid & Palo).


On August 11, Apache Kylin Meetup @Beijing, hosted by Kyligence and co-hosted by Meituan Dianping, successfully ended in the headquarters of Meituan. This article is a transcript of a presentation given by Apache Kylin Committer, Big Data Engineer, meituan. The full text is 6,600 words and takes about 15 minutes to read.


This sharing will be divided into three parts. Firstly, we will introduce the problems of Kylin On Hbase and why we need to add a new storage engine to Kylin. The process for Kylin’s new storage engine will be described to illustrate why we chose Druid as our new storage engine. Finally, we will introduce the core architecture, core principles, performance and preliminary results of Kylin On Druid. We will refer to Kylin On Druid as KOD for short.


Status quo of Kylin’s service on Meituan Dianping


Before we get to the main topic, let me introduce Kylin’s current situation on Meituan Dianping. At present, there are nearly 1000 cubes online, and the storage of single Cube copy is nearly 1PB. The query volume is more than 3.8 million times every day. The TP50 delay of query is about 200ms, and TP90 delay is about 1.2s. Kylin now covers all major business lines of Meituan Dianping.


Kylin on HBase problem


With the large-scale use of Kylin in our company and the in-depth optimization and improvement of Kylin, we found some pain points and problems of Kylin itself, one of which was the performance problem of Kylin On HBase. Figure: We use the same SQL to query the same Cube in the same cluster, the performance difference is thousands of times. The only difference between the two queries is that the positions of the Kylin dimension are different in the HBase RowKey. In the query that takes 90ms, dt and poi_id are the first two digits of the RowKey, and in the query that takes more than 100 seconds, dt and poi_id are the second two digits of the RowKey.


The reasons for the huge difference in prefix and suffix filtering performance in Kylin On HBase are as follows: In Kylin, Cuboid+ all dimensions are merged into HBase rowkeys. By default, Kylin combines all common indicators into values in the same Column in an HBase Column Family. HBase has only a single Rowkey index. Therefore, the query performance is high only when the query matches the prefix of the Rowkey. Otherwise, the query performance is low and full table Scan is performed. In addition, Kylin needs to Scan all indicator columns on HBase even if only one indicator is queried, resulting in a significant difference in column storage performance. In general, Scan and Filter efficiency in HBase is low in Kylin query scenarios.

For the low efficiency of Kylin On HBase Scan and Filter, the natural solution is to use column storage to speed up Scan and index to speed up Filter.

Here I briefly introduce the advantages of storage, mainly including the following three points:

  1. Columns have efficient IO because only the required columns need to be read
  2. Because each column data is of the same type and format, column storage can be efficiently encoded and compressed
  3. Column storage is easier to implement vectorized execution, which has fewer function calls and is more CPU Cache and SIMD friendly than the traditional volcano model. In general, column storage is more suitable for Kylin’s query scenario than HBase’s KV model.

Therefore, to solve the problem of low efficiency of Kylin On HBase Scan and Filter, we need to add a column storage engine with efficient indexes for Kylin.


Kylin’s new storage engine to explore the road


Before we could add a storage engine to Kylin, we naturally needed to understand the makeup of Kylin’s storage engine. There are five main parts: storage format, Cache, calculation, scheduling and metadata. Scan, filter, and aggregate index data. Scheduling refers to file addition and deletion, replication, and load balancing. Metadata refers to the metadata of the storage engine. The storage format has a great impact on query performance and is also a pain point for HBase in Kylin query scenarios. Therefore, we decided to first find or modify a storage format suitable for Kylin.


Kylin new storage engine implementation ideas


At that time, we had two main ideas. One was to evolve based on the Spark + storage format: To find an excellent storage format and integrate it with Spark. The idea is to Cache files locally and use Spark to schedule computation and query. The whole solution can be Run. If you are interested in this idea, you can refer to TiDB’s TiSpark project and Snappydata.

The second idea is to find or develop an excellent storage format, and then refer to systems such as HBase and Druid to gradually improve it into a complete storage engine.

So, no matter which way of thinking, we need to first find or develop a good storage format for Kylin.


When investigating storage formats, we mainly consider Scan and Filter performance, import performance, compression rate, and integration difficulty, among which Scan and Filter performance is the focus.


Kylin On Parquet POC


We started with Parquet. Because Parquet is the current standard for column files in the Hadoop ecosystem, it is widely used in the Hadoop ecosystem. A Parquet file is logically divided horizontally into row groups. Each Column is a Column chunks, and the Column chunks are further divided into pages, which are the smallest units of data access. Parquet can implement row groups granularity filtering through min, Max indexes and dictionaries, but does not have page-granularity indexes.

We carried out Kylin On Parquet POC in May of 2017, and the result of POC was in line with our theoretical expectation: Because Parquet is column storage, the Scan performance is better than that of HBase when partial columns are scanned. However, due to Tuple recombination, the Scan performance decreases as the number of accessed columns increases, and the Scan performance is worse than that of HBase when all columns are scanned. In terms of Filter, Parquet has no difference in prefix and suffix filtering performance. However, because Parquet does not have a fine-grained index with Page granularity, its prefix filtering performance is significantly lower than that of HBase.


Kylin On CarbonData

Due to the poor performance of Parquet filtration, we passed Kylin On Parquet scheme. After Parquet, we investigated huawei’s new open source storage format CarbonData. Like Parquet, CarbonData starts by horizontally splitting the data into several blocklets, which are stored internally in columns, each of which is called a Column Chunk. Unlike Parquet, CarbonData has a wealth of indexes: MDK indexes; Min, Max index; Invert the index. MDK indexes are multidimensional indexes, similar to dimension indexes in Kylin. The entire file is sorted by multiple dimension columns, so prefix filtering of dimensions in MDK columns is efficient.

CarbonData’s column + rich index design was exactly what we were hoping for, but CarbonData and Spark had a deep coupling, and CarbonData didn’t have an OutputFormat and wasn’t very mature at the time, So we also passed Kylin On CarbonData.


Kylin On Lucene POC


Parquet, CarbonData. We had Kylin On Lucene POC and Kylin On Druid POC at about the same time. Let’s take a look at Lucene. Lucene is a widely used full-text search library that is mMAP-based and supports column storage of inverted indexes.

The results of Kylin’s POC On Lucene are consistent with our theoretical cognition of inverted indexes: Because inverted indexes use extra construction and storage costs to obtain efficient filtering performance in query, Lucene’s build performance is only one third of that of HBase, and the storage performance is four times that of HBase, but the filtering performance is significantly better than that of HBase.


Kylin On Druid POC


So let’s take a look at Druid’s storage format. Druid’s storage format is similar to Lucene’s. Druid’s storage format is mMAP-based and supports column storage of inverted indexes. Druid’s inverted indexes are based on bitmaps, while Lucene’s inverted indexes are based on inverted tables. Druid is stored in a simple column format, where M1 and M2 are indicator columns and D1 and D2 are dimension columns. In addition to the data file, a meta file records the offset of each column. When we want to read the Druid file, we MMap the Druid file to memory and read the data directly from the offset.


The top half of the figure shows how the Druid Bitmap inverted index works: Druid will first encode the dimension columns lexically. Meituan is coded as 0 and Dianping is coded as 1. Druid will then build a bitmap-based inverted index based on the dimension values and dictionary. The Key of the inverted index is the encoded ID, and the Value is the Bitmap. Which bit of the Bitmap is 1 indicates the row in which the Value appears. The second half is the Druid String dimension store: metadata for columns, dimension dictionaries, dimension encoded ids, and inverted indexes.

The figure shows the POC result of Kylin On Druid. Druid’s filtering performance is significantly better than HBase’s. Scan performance is similar to Parquet’s, and is significantly better than HBase’s in partial columns. Builds and stores are similar to Lucene, but worse than HBase.


Why Kylin On Druid

After we have a basic understanding of Parquet, CarbonData, Lucene, Druid’s storage format and POC, let’s look at why we came up with Kylin On Druid:

  • Parquet’s index granularity is coarse and its filtering performance is poor.
  • The coupling between CarbonData and Spark is deep and the integration is difficult.
  • Lucene has a higher storage bloat rate than Druid and, importantly, Druid is not only a storage format but also a complete storage engine for Kylin.

Let’s take a look at why we chose Kylin On Druid:

  • The first is Druid Scan, the Filter performs very well
  • Druid is not only a storage format, but can be used as a complete Kylin storage engine. For example, Druid Historical nodes Cache and compute segments, Druid Coordinator nodes add and delete segments. Copy and load balancing. This way we don’t have to evolve a storage engine based on the storage format, and our overall project time will be significantly shorter.
  • At the end of the day, Kylin and Druid are the systems we maintain, and even if the project fails, our efforts will be rewarded.



Kylin On Druid


OK, the first half mainly introduces the background of Kylin On Druid, answer Why Kylin On Druid. Let’s look at How Kylin On Druid.


Kylin pluggable architecture

Before introducing Kylin On Druid, let’s take a look at Kylin’s pluggable architecture. Kylin’s pluggable architecture abstracts data sources, computing engines, and storage engines, all of which are theoretically replaceable. Data sources already support Hive and Kafka. The computing engine supports Spark, MR. However, the storage engine only has HBase. Kylin On Druid replaces HBase with Druid in Kylin’s pluggable architecture.


Kylin On Druid architecture

Here is a simplified Kylin On Druid architecture diagram, divided into data import and data query lines. During data import, Kylin’s JobServer imports Hive data to Druid’s Historical node. When data is queried, Kylin’s Queryserver initiates a query to Druid through Druid’s Broker nodes.

Let’s take a look at the KOD data import and query process in detail. Let’s first look at the data import process:

  1. Like HBase, calculate the Cuboid first
  2. After the Cuboid is generated, the Cuboid is converted to HFile of HBase in Kylin On HBase, and the Cuboid is converted to Druid Segment file in KOD.
  3. Insert Segment MetaData into Druid’s MetaData Store. The following works for Druid’s internal process:
  4. A Coordinator periodically pulls Segment MetaData from the MetaData Store. When a Coordinator detects that a new Segment has been generated, the Coordinator sends a message to the Historical node to Load the Segment
  5. After receiving the Load Segment notification, the Historical node downloads the Segment file from HDFS, and then MMap the Segment file to the memory. The data import process is complete.

KOD’s data query process, how Kylin and Druid interact we can do two things. Either integrate with Druid by accessing the Broker directly, or integrate with Druid by accessing the Druid Historical node directly. In the first way we simply translate Kylin’s SQL into Druid’s JSON and initiate a query to Druid’s Broker over Http. In the second way, we need to implement the Function of Druid Broker in Kylin. The advantage is that the performance is better than the first way, because there is one less layer of network traffic, but the disadvantage is that kylin and Druid dependency conflict is more serious, and the implementation is more complex. Based on the principle of being less intrusive and simple to use, we chose the first option.


Kylin On Druid implementation details


Now that we know about KOD’s architecture and the entire data import and query process, let’s take a look at what we did:

  1. Schema mapping for Kylin and Druid
  2. Cube build side fit
  3. Cube query side adaptation
  4. O&m tool adaptation

In fact, these four aspects of work are also what we must do to add a storage engine for Kylin.

Kylin and Druid’s Schema mapping looks like this: The Cube of Kylin maps to the DataSource of Druid, the Segment of Kylin maps to the Segment of Druid, and the dimension to dimension column of Kylin maps to the Segment of Druid.

We currently add Kylin’s exact de-weight metric, Kylin’s ExtendColumn metric, and Decimal metric to Druid. Whether it’s Druid or Kylin, what we need to do to add an aggregation metric is similar, but the interface and implementation are different. We customize an aggregation metric to do these things:

  • Define the metadata of the indicator: the name parameter of the indicator, return type, and core data structure;
  • Define the aggregation mode of indicators;
  • Define how metrics are serialized and deserialized;
  • Register index, let the system discover new index.

In addition to converting Cuboid segments and Load Segemnt into Druid segments, KOD also added a Druid Tier Update step to support isolation of the Druid Tier. Allows users to store different cubes on different tiers. Both the Druid Segment generation and Load steps have been optimized. Generating Druid Segment requires more memory than generating HBase hfiles because the inverted index is required. Therefore, this step requires more memory upgrade optimization. The Druid Load Segment will be very slow if multiple segments are imported concurrently. Druid 0.10.0 has a bug in the Load system. The default concurrency configuration does not take effect, so the Load is serial. 99.9% of Druid Load segments are downloaded from HDFS.

Before introducing KOD’s adaptation of the Cube query side, let’s take a quick look at Kylin’s query process. Kylin implements SQL parsing, logical plan generation and optimization through Calcite. Kylin generates HBase Scan request according to the query plan generated by Calcite. Filter,Agg will return the result to Kylin’s QueryServer once, QueryServer will deserialize the result returned by HBase, decode the dimensions according to the dictionary to generate Obejct array, and then convert Obejct array into Tuple. The main difference between a Tuple and an Obejct array is that the Tuple has type information, which is handed to Calcite’s Enumerator for final calculation.

Let’s take a look at KOD’s adaptation of Cube’s query side. After we get the query plan generated by Calcite, we will first convert Kylin’s Filter to Druid’s Filter in order to implement predicate push-down. Second, partition clipping is done to avoid accessing unnecessary Druid segments. Druid can then be queried using the Cube, dimension, metric, Filter, etc. Druid can then be queried using Http. Druid can then be scanned using Http. Filter,Agg will Pipline the query results back to Kylin’s QueryServer in the form of Http Chunk. The process is similar to HBase. Note that since we store the original values in Druid, So queries do not need to load dictionaries to decode dimensions.

Finally, let’s look at the adaptation of operation and maintenance tools, mainly including:

  • Cube Migration tool
  • Storage Cleanup tool
  • Cube Purge, Drop, Retention operations, etc.

I won’t go into it here, because it’s mostly a matter of implementation and detail.


Kylin On Druid results

So far, we have seen the overall architecture and core principles of KOD. Let’s take a look at the performance of KOD. First of all, let’s take a look at the SSB test results. The PPT shows the SQL sample and test environment we tested. It should be noted that we only calculated Base Cuboid in both KOD and Kylin, because the test performance is basically meaningless if KOD and Kylin are fully predicted. The purpose of the test is to compare the on-site computing capabilities of KOD and Kylin.

This is the RESULT of the SSB test when KOD and Kylin only calculate the Base Cuboid. The vertical axis in the figure is the acceleration ratio of KOD to Kylin. The acceleration ratio of KOD at 10 million is 4.9, and that of KOD at billion is 130. Overall, KOD’s field computing power is two orders of magnitude better than Kylin’s.

This graph shows the real data online after the launch of the first batch of Cubes at KOD. We can see that KOD has improved query performance compared to Kylin, while storage and computing resources have decreased significantly

As mentioned above, KOD’s on-site computing capacity is two orders of magnitude higher than Kylin’s. Therefore, for data with data scale of hundreds of millions or less, we do not need to carry out complex optimization, but only need to build Base Cuboid, which significantly improves the user’s ease of use.


Kylin On Druid feature

After introducing the architecture, core principles and performance of KOD, we will summarize the characteristics of KOD:

  • Fully compatible with Kylin: SQL, Web, etc
  • Zonal prefiltration
  • No dictionary loading: Higher query stability than Kylin On HBase
  • The storage layer supports service isolation
  • Data of billions or less is only required to build Base Cuboid


Future plans for Kylin On Druid

Finally, let’s look at KOD’s future plans:

1 more efficient and streamlined Cube build process: Since Druid doesn’t rely on dictionaries, KOD’s Cube build eliminates the need to build dictionaries and eliminates several steps associated with dictionaries

2 Optimization of high-cardinality column query scenarios: This is also reflected in the previous POC. This scenario is not suitable for inverted indexes, but is the best scenario for HBase. Optimize Druid with Pinot on linkedin. Add a lightweight prefix index like Palo and ClickHouse to Druid as an option.

3 Support online Schema change: At present, Kylin’s Schema change is also a big pain point for users, and each Schema change requires full data brushing.

This section explains why we added a new storage engine to Kylin from the perspective of Kylin On HBase, introduces the process of our storage engine exploration, and finally introduces the architecture and principles of Kylin On Druid.

Finally, one thing to note is that the latest version of Parquet now has PageIndex, which significantly improves the performance of point queries, so Kylin On Paquert may still be worth trying out.


FAQ


Q1: Why not just use Druid instead of Kylin?


  1. Kylin’s predictive algorithms are more powerful than Druid’s, Druid can only compute Base cuboids
  2. Kylin supports accurate calculation based on predictive computation. Accurate weight removal is a strong requirement of our company
  3. Kylin’s SQL support is more complete
  4. Kylin’s offline import is more complete
  5. Kylin’s query support for star models is much friendlier
  6. Kylin support the Join
  7. In summary, replacing Kylin with Druid is far more work than Kylin On Druid.



Q2: Why Kylin On Kudu was not chosen?


  • Kudu has no inverted or secondary index
  • Kudu is implemented in C++, our team’s technology stack is mainly Java, and we didn’t introduce Kudu to our storage team



Q3: Why is there no option to optimize and improve HBase?


Changing the HBase key-value model to column storage is not just an optimization, but a redesign of the entire system. For reference: kudu.apache.org/faq.html#wh…


Q4: Druid and Kylin?


At our company, Kylin is the main offline OLAP engine and Druid is the main real-time OLAP engine.



About CarbonData’s DataMap features


CarbonData in the most recent version implements the DataMap features: carbondata.apache.org/datamap-dev…

The primary purpose of DataMap is to provide a store that supports multiple query scenarios, fulfilling CarbonData’s original vision; The core idea is a piece of data, multiple indexes, different scenarios under the query with different indexes to accelerate, query can be automatically routed to the corresponding index. At present, it has realized pre-Aggregate DataMap, Timeseries DataMap, Lucene DataMap, BloomFilter DataMap, etc. Personally, I prefer CarbonData.


Click here for more examples of Apache Kylin technology