On October 26, byte beating special technique salon | large data architecture End of bytes to beat headquarters in Shanghai. We invited Guo Jun, head of data warehouse architecture of Bytedance, Tao Jiatao, Big data R&D engineer of Kyligence, Xu Mingmin, storage engineer of Bytedance, and Bai Chen, senior technical expert of Ali Cloud to share with you.
The following is the precipitation of Apache Kylin Principle and New Architecture Sharing (Kylin On Parquet), shared by Tao Jiatao, big data r&d engineer of Kyligence.
Hello everyone, MY name is Tao Jiatao, a big data development engineer from Kyligence. I have been engaged in the development of the commercial version of Apache Kylin in Kyligence since graduation. Mainly involved in implementing the next generation query and build engine based on Spark. Today’s agenda is divided into three areas: I’ll start with a brief introduction to Apache Kylin and how it queries, and then I’ll introduce Parquet Storage, which my team has been working on and is expected to contribute back to the open source community by the end of this year, Finally, I’ll talk about precise de-weighting, which is very widely used by the community, as well as its implementation in Kylin and some extensions.
Kylin usage scenarios
Apache Kylin™ is an open source distributed analytics engine that provides SQL query interfaces on Top of Hadoop/Spark and multidimensional analysis (OLAP) capabilities to support very large scale data. It was originally developed by eBay Inc and contributed to the open source community. It can query huge Hive tables in subseconds.
As an SQL acceleration layer, Kylin can connect to various data sources, such as Hive/Kafka, connect to various BI systems, such as Tableau, PowerBI, and directly perform Ad Hoc queries.
If your product/business comes to you and says that a batch of queries is too slow and they want to speed it up, they want to speed it up. High query concurrency; Less resource occupation; With full SQL syntax support and seamless BI integration, and no more machines for you, you might consider using Apache Kylin.
Apache Kylin fundamentals
The core idea of Kylin is predictive computation, which calculates all possible query results in advance according to specified dimensions and indicators, and uses space for time to accelerate OLAP query with fixed query mode.
Kylin’s theory is based on Cube theory. Each combination of dimensions is called Cuboid, and the set of all CuBoids is Cube. The cuboids composed of all dimensions are called Base Cuboids. In the figure (Time, item, location, supplier) are Base CuBoids. All CuBoids can be calculated based on Base CuBoids. Cuboid can be understood as a large and wide table after predicted calculation. When querying, Kylin will automatically select the most suitable Cuboid that meets the conditions. For example, the query in the figure above will search for Cuboid (Time, item, location). Fetching data from Cuboid can greatly reduce the amount of scanning data and computation.
Apache Kylin query basic process
The first three steps are common to all Query Engine operations. We use the Apache Calcite framework to perform these operations. There are many resources on the web that we don’t want to expand too much.
This section focuses on the last two steps: Kylin adaptation and Query execution. Why Kylin fit? Rewrite (‘ data ‘, ‘data’, ‘data’, ‘data’, ‘data’, ‘data’, ‘data’, ‘data’)
The user has an Item access table (STOCK), where the Item, user_id, indicates which user has accessed the Item, and the user wants to analyze the PV of the Item. The user defines a Cube whose dimension is item and metric is COUNT(user_id). If the user wants to analyze the PV of the item, it will issue the following SQL:
1SELECT item, COUNT(user_id) FROM stock GROUP BY item;Copy the code
After this SQL is sent to Kylin, Kylin cannot directly use its original semantics to look up our Cube data. This is because only one row of data will exist in the key of each item after the data is predicted, and the rows of the same item key in the original table have been aggregated out in advance. A new measure column was generated for how many user_id accesses each item key, so rewrite SQL would look something like this:
1 SELECT item, SUM(M_C) FROM stockGROUP BY item;Copy the code
Why is there a SUM/ GROUP BY operation instead of just fetching the data and returning it? Because the Cuboid hit by the query may be more than one dimension of item, that is, the Cuboid hit is not the most accurate Cuboid, so it needs to be aggregated from these dimensions again. However, compared with the data in the user’s original table, the amount of data and calculation of some aggregated data are still greatly reduced. And if the query accurately matches Cuboid, we can directly skip the process of Agg/GROUP BY, as shown in the following figure:
Both Agg and Join involve shuffle operation. Therefore, when there is a large amount of data, the performance is poor and more resources are occupied, which also affects the query concurrency.
After the prediction calculation, the two most time-consuming operations Agg/Join have disappeared in the later modified execution plan (exact Cuboid matching). Even further, when we define cube, we can choose to Sort by the column of order BY, so the Sort operation does not need to be calculated. The whole calculation is only a stage, without a shuffle. The calculation can be completed by starting only a few tasks, and the query concurrency can also be improved.
Kylin On HBase
The basic principle of
In the current open source implementation, the built data is stored in HBase. In the above section, we obtained a logical execution plan that can query Cube data. The Calcite framework generates the corresponding physical execution plan based on this logical execution plan. Finally, each operator will generate the executable code of its own operator through code generation. This process is an Iterator model, and data flows from the bottom TableScan operator to the upstream operator. The whole process is like Volcano eruption, so it is also called Volcano Iterator Mode. The code generated by TableScan will retrieve Cube data from HBase. After the data is returned to Kylin’s Query Server, it will be consumed by upper-layer operators.
Kylin On HBase bottleneck
This solution is not a big problem for simple SQL, because when the Cuboid matches exactly, the Kylin Query Server does not do much calculation after retrieving data from HBase. However, when some more complex queries, such as one Query joining two subqueries, Each sub-query matches its own cube and performs some complex Aggregate operations, such as COUNT DISTINCT, in the outermost layer. In this case, the Kylin Query Server not only needs to pull back a large amount of data from HBase, In addition, Kylin Query Server also needs to calculate Join/Aggregate and other very time-consuming and resource-consuming operations. When the amount of data increases, Kylin Query Server may get OOM. The solution is to increase the memory of Query Server. However, this is a process of vertical expansion, which becomes a single point of bottleneck. The existence of a single point of bottleneck in big data solutions is a very serious problem, which may directly lead to the company’s one-click pass in the selection of architecture.
There are a number of other limitations:
-
For example, HBase is notoriously difficult to operate and maintain. If HBase performance is not good, then Kylin’s performance is not good either.
-
HBase resource isolation capability is also weak. When there is a large load at a certain moment, other HBase services will also be affected. As a result, Kylin may have unstable query performance, and Benchmark may have burr. It is cumbersome to interpret and requires cluster metric support, which is demanding on the front line.
-
HBase stores encoded Byte Array types. Serialization and deserialization costs cannot be ignored. For us developers, Calcite code generation is difficult to debug, and our HBase skill tree is less repaired, so it is difficult to make source-level performance improvements to HBase.
Kylin On Parquet
Due to the above limitations of the Kylin on HBase solution, our company has been developing a new generation solution based on Spark + Parquet in the commercial version to replace the open source solution for a long time. The overall architecture of the solution is described below:
Overall, the new design is very simple: The visitor pattern is used to traverse a previously generated logical execution plan tree that queries Cube data. The nodes of the execution plan tree represent an operator that holds information such as which tables to scan, which columns to filter/project, and so on. Translate each operator on the original tree into a Spark. For an operation on Dataframe, each upstream node asks its downstream node for a DF after it has processed it, all the way to the most downstream TableScan node, which generates the initial DF. Parquet (path) : cuboidDF= spark.read. Parquet (path) : cuboidDF= spark.read. Parquet (path) : cuboidDF= spark.read. Parquet (path) : cuboidDF= spark.read. Parquet (path) : cuboidDF= spark.read. Finally, the uppermost node collects this DF and triggers the whole calculation process. The idea of the framework is simple, but the gap between Calcite and Spark has more holes than we expected, such as data types/support functions on both sides/inconsistent behavior definitions, etc. In the future, we also plan to replace Calcite as Catalyst, so the whole structure will be more delicate and natural.
This Kylin On Parquet scheme is based On Spark:
-
All computing is distributed and there is no single point of bottleneck. The computing capacity of the system can be improved by horizontal expansion.
-
The following resource scheduling schemes are available: Yarn, K8S, and Mesos meet enterprise resource isolation requirements.
-
Spark’s performance efforts can be naturally enjoyed. The serialization and deserialization costs of Kylin On HBase mentioned above can be optimized by Spark’s Tungsten project.
-
This reduces HBase dependency and greatly facilitates operation and maintenance. All upstream and downstream dependencies can be handled by Spark, reducing our own dependency and facilitating cloud access.
-
For developers, they can directly collect the DF generated by each operator to observe whether there is any problem with the data at this layer. Spark + Parquet is a very popular SQL On Hadoop scheme at present, and our team is familiar with these two projects. We maintained our own Spark and Parquet branches, where we did a lot of work to optimize performance and improve stability for our particular scenarios.
At present, the scheme is being contributed back to the open source community, and detailed benchmark report can be issued after the contribution is completed. Since the contribution is not completed now, there is no direct performance comparison between the two schemes here, but our enterprise version is very bright compared with the open source number, and the query stability is also significantly improved. Under TPCH 1000, The current Kylin On HBase implementation cannot query results for some complex queries, whereas Kylin On Parquet can query results within a reasonable period of time.
De-analysis is introduced below. De-analysis is frequently used in daily analysis of enterprises, and how to perform de-analysis quickly in big data scenarios has always been a major difficulty. Apache Kylin uses predictive computation + Bitmap to speed up this scenario, enabling fast responses with precise de-weighting on very large datasets.
Precise de-weighting in Kylin
Here is an example to lead to our subsequent discussion:
As in the commodity access table above, this time we want to find the UV of the commodity, which is a very typical scenario for deweighting. Our data is stored on a distributed platform, on data nodes 1 and 2.
The distributed computing framework starts the task and fetches the data from the two tables. The SQL group by item performs a shuffle of the raw data from the two tables. Shuffle (select/group by item) shuffle (select/group by item) shuffle (select/group by item) shuffle (select/group by item) shuffle (select/group by item) shuffle However, we only need one statistic of user_id. Can we not shuffle the entire original value of user_id?
If the count is simple, each data node calculates the count of the user_id of the corresponding item respectively, and shuffle the count, because count is only a number, so shuffle is very small. However, because the index of analysis is count DISTINCT, we cannot simply add the count DISTINCT values of user_id of two nodes. Only by obtaining all user_id corresponding to a key can we get the correct count DISTINCT values. These values may originally be distributed on different nodes, so we can only shuffle these values to the same node and then redo them. If the amount of data in the user_id column is very large, the amount of data to be shuffled is also very large. We really only need a count value at the end, so is there a way to shuffle the original value of the entire column? The two algorithms I will introduce below provide such an idea, using fewer bits of information, it is also possible to find the number of non-repeating elements in the column (cardinality).
Bitmap algorithm
The first algorithm to be introduced is an accurate de-duplication algorithm, mainly using the principles of Bitmap. A Bitmap, also known as a Bitset, essentially defines a large array of bits, each of which corresponds to one of the bits in the array. For example, if there is a set [2, 3, 5, 8], the corresponding Bitmap array is [001101001], 2 in the set corresponds to the position of array index is 2, and 3 corresponds to the position of array index is 3, the same as below, the resulting array is called Bitmap. Intuitively, the number of 1’s in the array is the cardinality of the set. Tracing back to the origin, our goal is to use smaller memory to represent more information, and the smallest unit of information in the computer is bit. If we can use a bit to represent an element in the set, compared with the original element, we can save a lot of storage.
We can think of a Bitmap as a container. We know that an Integer is 32 bits. If a Bitmap can hold up to integer.max_values, So this Bitmap needs to be at least 32 lengths. A 32-bit Bitmap takes up 512 M of space (2^32/8/1024/1024), which has an obvious problem: whether it has only one element or four billion elements, it takes up 512 M of space. Back to the UV scene, not every commodity will have so many visits, some popular style may have hundreds of millions of visits, but some of the more unpopular goods may only have a few users browse, if all use this Bitmap, they take up the same space, which is obviously unacceptable.
Bitmap: Roaring Bitmap
Roaring Bitmap: Array Container
ArrayContainer is the default Container for Roaring Bitmap initialization. An Array Container is suitable for storing sparse data. The data structure inside an Array Container is a short Array, which is orderly and easy to find. The initial size of the array is 4 and the maximum size of the array is 4096. When the maximum capacity exceeds 4096, the Container is converted to Bitmap Container. Here is an example of putting data into an Array Container: There are two digits 0xFFFF0000 and 0xFFFF0001 that need to be placed in a Bitmap. The first 16 bits of both are FFFF, so they are the same key, and the last 16 bits of both are stored in the same Container. The last 16 bits are 0 and 1, respectively. You can store 0 and 1 in the Array Container, which takes up 512 MB of memory compared to the original Bitmap. This storage actually takes up only 2+4=6 Bytes (2 Bytes for key and 4 Bytes for two values, regardless of the initial size of the array).
Roaring Bitmap: Indicates the Bitmap Container
The second type of Container is the Bitmap Container. Its data structure is an Array of longs with a fixed capacity of 1024, unlike Array Container, which is a dynamically expanded Array. Here we derive the value 1024: Since each Container also processes the last 16 bits of data, using Bitmap to store 8192 Bytes (2^16/8), and one long contains eight Bytes, a total of 1024 (8192/8) longs are required. Therefore, a Bitmapcontainer occupies 8 KB (1024 x 8 bytes) of memory. When the number of elements in an Array Container reaches 4096, the space occupied is exactly 8 KB (4096 x 2 Bytes), which is exactly 8 KB used by Bitmap. When you store more than 4096 elements, the size of the Array Container increases linearly. However, the memory size of the BitmapContainer does not increase. So when an ArrayContainer exceeds its maximum capacity (DEFAULT_MAX_SIZE), it is converted to a Bitmap Container.
When we practice using Roaring Bitmap in Kylin, we found that Array Container will constantly resize its Array with the increase of data volume, while the resize of Java Array actually consumes performance. Since DEFAULT_MAX_SIZE is constantly allocating new memory and the old memory will not be released until the replication is complete, we recommend setting DEFAULT_MAX_SIZE lower to 1024 or 2048. Reduce the number and cost of Reszie array in ArrayContainer.
Roaring Bitmap: Container summary
When the number of elements exceeds 4096, the space occupied by ArrayContainer increases linearly. The Bitmap Container’s storage footprint is independent of the amount of data. In this case, the Bitmap Container’s revenue is better. The storage size occupied by Run Container depends on data continuity. Therefore, only a range of 4Bytes, 128KB can be drawn.
Look at the scene again
Let’s go back to the previous de-duplication scenario and see what gains Bitmap can give us. In the non-optimization case, the user_id corresponding to each item can be regarded as a set of original values. In bitmap-optimized cases, the user_id corresponding to each item can be regarded as a Bitmap instance, and the Bitmap instance will take up less space than the set directly storing the original values (in most cases). This meets our initial need to reduce the amount of shuffle data.
Kylin focuses on the subtleties of user behavior analysis
Bitmaps not only support efficient OR operations, but also efficient AND operations. For example, in the example above, we can directly use the previously established Bitmap to analyze user behavior.
To facilitate the and operation in SQL, Kylin provides a custom function “intersect_count” (see Apache Kylin’s official documentation for details). As the name implies, this function is the number of outcomes after the intersection.
As you can see, in other Query engines, two subqueries are joined and three count distinct aggregates are used to calculate the 2-day retention rate of the user. Kylin only needs to directly use intersect_count function to support such analysis.
Finally, Apache Kylin has more than 1000 global users, which indicates that many users have already helped us step over the pit. Apache Kylin is a very mature OLAP solution for big data, so you can feel free to choose Apache Kylin boldly when making technology selection. Welcome to add my wechat for further communication and discussion (wechat id: 245915794)!
QA highlights
Question: How does Kylin on Parquet use Spark, through Thrift Server?
Answer: Kylin submits a resident SparkContext to Yarn at startup, and Kylin acts as the driver, to which all subsequent queries are sent for calculation.
Question: How does Bitmap handle non-numeric data?
Answer: A global dictionary is created for these types of data, and an ID for each data is obtained to build a Bitmap.
Question: How do I use the global dictionary? Do I use the global dictionary every time I query?
A: The global dictionary is only used at build time to generate bitmaps, after which there is an extra Bitmap column on the Cube data, which can be aggregated directly when querying.
Question: Will the Cube take up a lot of space?
Answer: If there is no pruning, Cube will have the “curse of dimensions” and space expansion will be very severe. Therefore, Kylin has a pruning mechanism. For example, the three dimensions of ABC must be analyzed, so Cuboid like ABD can be pruned. Check out the documentation on Kylin’s website for details.
Live video
V.qq.com/x/page/w301…
More wonderful sharing
Shanghai salon review | core of bytes to beat on the Spark SQL optimization practice
Shanghai salon review | bytes to beat level how to optimize all nodes HDFS platform
Review | Shanghai salon Redis high-speed cached in the application of the large data scenarios
Bytedance Technology Salon
Bytedance Technology Salon is a technical exchange activity sponsored by Bytedance Institute of Technology and co-sponsored by Bytedance Institute of Technology and Nuggets Technology Community.
Bytedance Technology Salon invites technical experts from Bytedance and Internet companies in the industry to share hot technical topics and front-line practical experience, covering technical fields such as architecture, big data, front-end, testing, operation and maintenance, algorithms and systems.
Bytedance Technology Salon aims to provide an open and free exchange and learning platform for technical talents, helping them learn and grow, and keep advancing.
Welcome to Bytedance Technical Team