Those who have known about Kylin know that it is mainly in the offline aspect of processing, but in the real-time aspect of processing people know little, I hope to share with you what I have learned recently.

In fact, the community has already provided a near-real-time solution in 1.6 version, which has a minute-level preparation time, which is not tolerated in a scenario where real-time requirements are urgent. Meanwhile, it is implemented by creating a segment for each batch of data. One segment corresponds to one Hbase Table, which will lead to the existence of a large number of Hbase tables and MR JOB data in the long run. For these reasons, Kylin considers the technical research and implementation of real-time stream processing.

Architecture parsing

Real-time stream processing is mainly composed of Streaming Receivers, Streaming Coordinator, Query Engine, Build Engine and Metadata Store.

Streaming Receivers: Streaming Receivers can consume the same partition from one or more Streaming Receivers to ensure that HA, Streaming Receivers exist as a cluster.

Streaming Coordinator: Acting as the Coordinator of Streaming Receivers. Specifies that Streaming Receivers are responsible for consuming topic data;

Query Engine: Data Query Engine. Provide data query capability.

Build Engine: Build Engine Responsible for submitting data to historical data.

Metadata Store: Metadata Store. Stores metadata information such as Receivers allocation information, high availability information, etc.

The data architecture

As can be seen from the figure, data flows into memory from message components such as Kafka. When the data reaches a specified threshold or after a fixed period of time (the default is one hour), the memory data will be swipes to the hard disk. After a certain period of time, the disk data will be synchronized to HBase through the MapReduce task.

Therefore, if you want to make a data query, you need to aggregate the data from memory, hard disk, and HBase.

Storage process

Streaming Receivers use the Streaming Coordinator to specify that Receivers consume the Partition data of the Topic associated with the Cube. The Streaming Coordinator specifies that Receivers consume the Partition data of the Topic associated with the Cube. Receivers does the Base Cuboid construction and can also support the creation of some commonly used Cuboids to improve the performance of queries. After a period of time, the data will be swappable from the local data to HDFS and the Coordinator will be notified. After all the Replica sets upload all the real-time data of the Cube Segment to HDFS, the Coordinator will be notified. The Coordinator triggers a MapReduce Job to perform a batch build. Job pulls live data from HDFS for the build, does some merging, and synchronizes the Cuboid into HBase. When the synchronization is complete, the Coordinator notifies the live cluster to delete the data.

Data query

When a new query is received by the QueryServer, it will request whether the Cube queried by the Coordinator is a live Cube or not. This will be processed separately based on whether the request is of a live Cube type. Offline Cube will directly query the data in HBase. Real-time Cube requests that the data is not in the real-time data will send RPC requests to HBase and query requests to the real-time cluster at the same time. The results will be aggregated into the query engine and returned to the user.

Both real-time Cube and non-real-time Cube requests pass through the Coordinator, and the QueryServer acts as a unified entry point for requests, which are then processed differently for both real-time and non-real-time scenarios.

Implementation details

Fragment File storage format

As mentioned above, the memory data will be written to the hard disk, and the Fragment File is the corresponding persistent File mode.

Segmnet Name consists of start time and end time. Each increment of the Fragment file generates a Fragment ID, which is an incremental value.

The Fragment structure is a column structure consisting of two files, the Fragment data file and the Metadata file. Data files can contain multiple cuboids. By default, only one Base Cuboid is built. If any other mandatory cuboids are configured, then multiple cuboids will be generated. The data is stored in a sequence of Cuboids. Each Cuboid is stored in a column. The data in the same column is stored together. Basically today’s OLAP storage is typically column storage for performance. The data of each dimension includes these three parts:

  • The first part is the Dictionary, which makes a Dictionary of dimension values.
  • The second part is the value, encoded.
  • The third part is the inverted index.

The Metadata file contains important Metadata, such as Offset, the location from which the dimension starts, the length of the data, the length of the Index, and so on, so that it can be located quickly when querying in the future. The metadata also contains some compression information, specifying the method by which the data file is compressed.

The compression

  • For time-dependent dimensions, the data is basically similar or increasing. In addition, when designing Cube, there is also the design of Row Key. If the Row Key is ranked in the first place, the compression efficiency will be higher with the use of run length, and the efficiency will be higher when reading.
  • LZ4 will be used for other data compression by default. Although the compression rate of other compression algorithms may be higher than LZ4, LZ4 has better decompression performance. We chose LZ4 mainly from the aspect of query, so there may be some other conclusions from other perspectives.

High availability

High availability is realized by introducing the concept of Replica Set. A Replica Set can contain multiple receivers. All receivers of a Replica Set share Assignment data, and the receivers below the Replica Set consume the same data. In one Replica Set, there is a Leader who does the extra work, which is to save the real-time data to HDFS. The Leader election is done by ZooKeeper. This is how real-time clustering implements HA to prevent queries and builds from being disrupted by downtime.

check point

Check point is used as error recovery to ensure that no data loss will occur when the service is restarted. There are two main ways to ensure this. One is Local check point, which is stored locally and a check point is made locally every 5 minutes. Save the message information in a file, including offset information for consumption, local disk information, such as what the maximum Fragments ID is. When rebooting, use this to restore. The second method is Remote Check Point, which stores some consumption status information in the HBase Segment. When storing historical Segment information, such consumption information is stored in the metadata of the Segment. When building this Segment, where is the data first consumed and where is the information stored?

Summary of the test questions

1. What locations are involved in Kylin real-time stream data?

Local memory, HDFS (hard disk), HBase three aspects.

2. Talk briefly about the role of the Streaming Coordinator.

  1. Coordinates Consumers’ consumption Partition data
  2. Notifies Build Engine to start Mr Job to process real-time data stored on HDFS
  3. When the live data is stored in HBase, the Build Engine is notified to delete the data on HDFS
  4. When querying, coordinate different query types, and carry out different processing logic

3. What happens to the Cuboid aspect of the build?

The real-time data is built as a Base Cuboid by default, but is configured if there is another Mandatory Cuboid.

4. Why use LZ4 compression format?

In order to query more efficiently, the corresponding compromise is made.

5. What information does the Fragment File hold?

Cube Name, segmented segmented Name according to time, main files are. Data and. Meta, in which there are inverted index, encoded value and dictionary, and meta file contains metadata related information.

The original address: https://www.infoq.cn/article/…