This paper is compiled from the topic “Flink Based On The Practice of Petabean Data Impromptu Query” shared by Su Jun and Liu Jia, big data engineers of 360 Government Enterprise Security Group, in Flink Forward Asia 2020. The content of the paper is as follows:
- Architecture and Design of Threat Hunting Platform (Soviet Army)
- Optimization and Exploration aimed at reducing IO
- The future planning
Start with a brief personal and team introduction. We are from 360 Government-Enterprise Security Group, and currently we are mainly engaged in the development of 360 Security Brain’s “threat hunting” project. Our team got in touch with Flink early. During this period, we developed a number of products based on Flink and participated in Flink Forward conference held in Berlin in 2017 and 2019. We introduced UEBA and AutoML respectively.
This share is mainly divided into two parts:
- The first part, “Architecture and design of Threat Hunting Platform”, will be shared by the Soviet Army.
- The second part, “Optimization and exploration aimed at reducing IO”, will be shared by Jia Liu.
I. Architecture and Design of Threat Hunting Platform (Soviet Army)
The first part is roughly divided into three parts, respectively:
- Evolution of the platform
- Architecture design
- Explore the index structure in depth
1. Platform evolution
We believe that all technological evolution and innovation need to be driven by specific business problems. Here are some of the products our team has developed based on Flink in recent years:
- In 2017, based on Flink DataStream, we developed a user behavior analysis system, UEBA, which accesses various behavior data of enterprise IT topology, such as identity authentication data, application system access data, terminal security data, network traffic parsing data, and so on. The real-time detection of threat behavior is carried out by taking the user/asset as the core, and the system of user threat level and portrait is constructed at last.
- In 2018, based on the implementation experience of UEBA, we found that security analysts often need a means to obtain the original logs corresponding to security events to further identify the source and solution of security threats. Therefore, we developed HQL based on Spark to solve the problem of data retrieval in offline mode. HQL can be regarded as a query language with richer expression ability than SQL, and can be regarded as an addition of algorithm class calculation on the basis of SQL.
- In 2019, with the use of offline HQL for customers, we found that it can quickly define security rules and build threat models. If you write statements in offline mode and publish them directly for online tasks, the development cycle will be greatly shortened. Plus, the relatively perfect Flink SQL capability. Therefore, we upgraded the HQL capability based on Flink SQL + CEP, and produced the HQL RealTime version.
- In 2020, with the increase of customer data volume, many have reached the PB level. The past solutions lead to the performance of offline data retrieval is far lower than expected. Security analysts are accustomed to using fuzzy matching operations such as like and full-text retrieval, resulting in very large query latency. So starting this year, we focused on optimizing HQL’s offline search capabilities and launched the new Threat Hunting platform.
According to the survey, customers with petabyte data scale often have the following business needs:
-
The first is a low-cost cloud-native architecture. As we know, most of the current big data architectures are based on Hadoop, which is characterized by the data on the compute node, which can reduce a lot of network overhead and accelerate computing performance. However, in order to achieve resource balance, the entire cluster usually needs the same resource configuration. In addition, in order to store as much data as possible, the cluster size is large, so such an architecture requires a large amount of hardware costs in the early stage.
The separation of memory and calculation and elastic computing can solve this problem, because the price of disk is far lower than memory and CPU, so the use of cheap disk storage with low CPU and memory to store data, with a small number of high machine to do the calculation, can greatly reduce the cost.
-
The second is low latency query response. When security analysts do threat detection, most of the time is ad-hoc query, that is, data retrieval and association through filtering and join. In order to obtain the query results as soon as possible, the corresponding technical solution is: column storage/index/cache.
- Column storage needless to say, is a common storage solution in the field of big data;
- On the basis of column storage, efficient index scheme can greatly reduce IO and improve query performance.
- The network delay caused by the analysis can be compensated by distributed cache.
-
Thirdly, it needs rich query capabilities, including single-line fields/filter/ UDF, multi-line aggregation /join, and even analysis capabilities of algorithm classes, which we mainly rely on HQL developed by ourselves to provide.
2. Architecture design
First of all, the data is from the historical data stored in ES and the real-time data in Kafka. The historical data in ES is synchronized by our own synchronization tool, and the real-time data in Kafka is written to the ORC File through the Streaming File Sink to the storage cluster. At the same time of data synchronization, we will update the index information of this batch of data to the database.
Security analyst HQL would write interactive from the front page, through the analysis of language data retrieval request, the request will be into the scheduling system, once started operations, will first analyze the statement parsed into list of operator, operator will determine whether the second query cache algorithm can hit an existing cache data caching system.
- If the input of the analysis statement is an intermediate result that has already been calculated and the cache is ready, then read the cache directly to continue the calculation.
- If not, prove that we must recalculate from the ORC file.
We will first extract the filter conditions or Join conditions of the query language to do predicate push-down, enter the index database to obtain the list of files that currently match the query, and then hand the list of files to the computing engine for calculation. The computing engine is a two-engine model, in which the high-complexity statements are completed by the Flink engine, and other simpler tasks are assigned to the “hummingbird engine” within the platform. The Hummingbird engine is based on Apache Arrow for vectoquantized execution, and with LLVM compilation, query latency is minimal.
Due to the separation of memory and calculation in the whole system, in order to speed up data reading, we added Alluxio to the computing cluster node to provide data cache service, which not only caches the data on the remote cluster, but also caches part of the historical job results. The algorithm of operator cache is used to accelerate the next computing task.
Two more points need to be stressed here:
- The first point is that the index database will return a list of files that match the criteria. If the list of files is very large, the current Flink version builds the Job Graph, and when it gets the Filelist Statistics logic here, when it walks through a large number of files, This can cause problems with building the Job Graph for a long time. We are currently restoring it and will contribute to the community.
- The second is the data cache. Our HQL was previously implemented using Spark. Those of you who have used Spark may know that Spark uses a table to cache or persist. We also used this operator when we migrated to Flink. In Flink, we implemented a set by ourselves, that is, when the user is in the cache table, we will register it as a new table source, and then we will only use this new table source to get through the whole process when re-reading.
3. Explore the index structure in depth
Database To speed up data retrieval, we often create indexes for the data in advance, and then use the indexes to locate the starting position of the data before scanning, so as to speed up data retrieval. Row index is common in traditional database. Indexes are created by one or several fields and the index results are stored in tree structure. This kind of index can be accurate to row level and has the highest index efficiency.
Some big data projects also support row indexes, which have the disadvantage that large amounts of index data can cause write and retrieval delays. However, our platform deals with machine data, such as terminal/network data, which is characterized by high repetition, and the results of security analysis are often very few. A very small number of threatening behaviors will be hidden in massive data, accounting for 1/1000 or even less.
Therefore, we chose a more cost-effective block indexing scheme that can support current application scenarios. At present, according to the customer data, the index can provide more than 90% of the clipping rate for 85% of the statements, basically meeting the latency requirements.
While some big data platforms store index data as a file on disk, plus some cache mechanism to speed up data access, we store index data directly in the database. There are mainly two aspects to consider:
- The first is Transaction. We know that stored files are often not updatable, and we periodically Merge files to optimize the distribution of files. Transaction capabilities are required to ensure query consistency.
- The second is performance. The database has strong read, write and retrieval capabilities, and predicates can even be pushed down to the database to do so. The high compression ratio of the database can further save storage.
Above is a block index design. In our index database, we classify these data into different types of data sources. For example, terminal data is classified as one type of data source, and network data is classified as one type of data source. Our logic for classifying data sources is whether they have a unified Schema. For a single data source, it takes the date as the Partition, and the Partition is filled with a large number of small ORC files. As for the index structure, we will set min/ Max index for each field, and Bloom index for the field whose cardinalization is less than 0.001.
As mentioned above, security people prefer to use like and full text search. For the like section, we also did some optimization. In terms of full-text retrieval, we will do word segmentation for the data to build the inverted index, and also do the bitmap index at the file distribution level for the single item after a single word segmentation.
The figure above is a rough scale of the index size, assuming that the original log in JSON format is about 50 petabytes, and the ORC version is about 1 petabyte. Our Index data is 508GB, in which 8GB is Min/Max Index and 500GB is Bloom. With the bitmap and inversion mentioned above, the percentage of index data will be even larger. Based on this, we use a distributed indexing scheme.
We know that the log is constantly changing. For some data workers, they sometimes add fields or reduce fields, and even sometimes the field type changes.
In this case, we adopt the Merge Schema scheme to update the index information of the batch data during the incremental writing process. As you can see, in block123, file 3 is the last to be written. As files continue to be written, a new Merge Schema is formed. It can be seen that fields B and C are actually history fields, while A_V field is the history version field of A field. We use this method to let customers see more complete data as much as possible. Finally, based on the self-developed Input format and Merge Schema, a new table source is built to get through the whole process.
2. Optimization and Exploration aimed at reducing IO (Jia Liu)
The previous section explained why block indexes were chosen, so the next section explains how to use them. The core of a block index can fall on two words: “clipping.” Clipping is to filter out irrelevant files before the query statement is actually executed, to minimize the amount of data entering the computing engine, and to throttle from the data source side.
This diagram shows the entire system using IndexDB to do the clipping process:
- The first step is to parse the query. The related filter is obtained. You can see that in the left SQL statement, there are two filter conditions: src_address = a certain IP, occur_time > a certain timestamp.
- The second step is to put the query conditions into the Meta table of the data source corresponding to Index DB for file filtering . Src_address is a string field that is cropped using a combination of min/ Max and Bloom indexes. The occur_time is the numeric type field and the time field, and we prefered the min/ Max index to find the file clipping. Pushdown (index db); pushdown (index DB);
- Step 3. Once the list of files is obtained, TableSource is constructed with the merged schema mentioned above.
At the same time, when building the Source, we made some improvements in the details. For example, when sending a filter to an ORC reader, clear the pushdown filter to avoid secondary filtering on the engine side. Of course, we don’t clear out all the filters, we keep the like expression, and we’ll talk about the like pushdown filter later.
Next, we will focus on the four optimization points:
- First, the clipping rate has a theoretical upper limit when the data is not sorted. We use Hilbert curve to sort the original data when data is written to improve the clipping rate.
- Secondly, due to the particularity of the security domain, threat detection relies heavily on the like syntax, so we have enhanced the ORC API to support the push down of the like syntax.
- Thirdly, because the usage scenario depends heavily on join, we also optimize the join operation accordingly.
- Fourth, our system supports multiple file systems, so we choose Alluxio, a mature cloud native data orchestration system, as the data cache to improve data access locality.
1. Theoretical upper limit of clipping rate and Hilbert space filling curve
Clipping can be abstracted as the probability of N balls being thrown into M buckets. Here we directly say the conclusion. Assuming that rows are distributed randomly and evenly in blocks, the total number of rows in all blocks is fixed, and the total number of rows hit by the query condition is also fixed, then the block hit ratio is directly positively correlated with “total number of rows hit/total number of blocks”.
There are two conclusions:
-
First, if the total number of rows hit = the total number of blocks, that is, if the X-axis value is 1, and the hit ratio is 2/3, that is, 2/3 of the blocks contain the hit rows, the upper limit of the corresponding block pruning rate is 1/3. 1/3 is a very low number, but since it assumes a random and uniform distribution of data, we need to sort the original data as it is written in order to make the distribution better.
-
Second, assuming that the total number of hit rows is fixed, significantly reducing the number of rows in each block to increase the total number of blocks can also improve the pruning rate of blocks. So we reduced the block size. According to the test results, we set the size of each file as 16M. Reducing file size is simple. For sorting, we introduce Hilbert space filling curve.
Why use the Hilbert curve? It’s based on two things:
- First of all, what path should I take to traverse a 2-dimensional space, so that the address sequence of the path is basically ordered for either dimension? Why do I have to order every column or subset? Because the system in the use of the process, the query conditions are not fixed. Five fields are used to sort data while only one or two fields may be used to query data. Hilbert sort allows multiple fields to be ordered both globally and locally.
- In addition, there are many space filling curves, and there are z-shaped curves, serpentine curves, and so on. You can see the two diagrams on the right. Intuitively, the curve path should have as few long-span jumps as possible, and the point position should be as stable as possible during the iteration. Hilbert curve has the best performance in space filling curve.
In Hilbert’s case, you implement a UDF that inputs column values, outputs coordinate values, and sorts by coordinate values.
We sampled 1500 SQL statements used by the customer environment and filtered out statements that had a 100% clipping rate of correlation, that is, invalid statements that did not hit files. Then there were 1148 left, and when we used these statements to sort the clipping rate, we compared the clipping rate, and the clipping rate increased from 68% to 87% in the 95th percentile, a 19% increase. You might think that 19% is not a very high number, but if we take a base number, say 100,000 files, it looks pretty good.
2. Optimization of Like in dictionary indexes
We talked about the specificity of the security industry, where we rely heavily on like queries when we do threat detection. For that reason, we also optimized it.
- First, we added a like condition to the ORC API to ensure that the LIKES in SQL can be pushed down to the ORC Record reader.
- Secondly, the row group filter logic of the ORC Record reader is refactored. If it is found to be a like expression, the dict Steam of this field is read first, and the dict stream is judged whether it contains the like target string. If the value does not exist in the dictionary, skip the row group and do not need to read the data stream and length Steam, which greatly improves the file reading speed. Later we will also consider building a dictionary index into the index database by pushdown the dictionary filter directly into the database.
For example, as shown in the figure, there are three expressions in the leftmost SQL. We send the filter to the ORc reader with only the last one attachment_name like ‘% bid %’ The only records that really need to be read are the dict row groups that contain “bid”, that is, filtering at the row Group level, further reducing the amount of data that needs to go into the computing engine.
3. Join optimization based on index
Join operation is widely used in the matching of threat intelligence. To accelerate the performance of join, it is far from enough to just use filter pushdown of where condition.
Flink already has a number of built-in join algorithms, such as Broadcast Join, Hash Join, and Sort Merge Join. Among them, Sort Merge Join is very friendly to pre-sorted table join, and as mentioned above, we use Hilbert curve to conduct joint sorting of multiple fields, so Sort Merge Join is temporarily out of our optimization scope.
In addition, we know that the performance of join is positively correlated with the size of the left and right tables, and the sparsity of the threat intelligence join is very high. Therefore, cutting the left and right tables in advance can greatly reduce the data entering the join stage.
As mentioned above, we have created bloom indexes for common fields. It makes sense to pre-filter files using the bloom that has already been created, and it saves the time and expense of building the Bloom.
For broadcast Join, we scan small tables directly, enter records of small tables into the bloom of files belonging to large tables in turn, determine whether the data block is needed, and pre-crop tables with large data volumes.
For hash Join, as we can see, we can pre-join the file-level bloom of the join key by “and” the bloom of a file belonging to the left table and the bloom of the file belonging to the right table. Only the files of the left and right tables that can be “and the number of subsequent results is not 0” are retained, and the remaining files of each table are sent to the engine for subsequent calculation.
For example, the three tables in the picture are Table1, Table2 and Table3. We can get the table statistics from the index DB, which is the number of files or the size of the file table. The figure shows the number of files: Table 1 is 1000, table 2 is 50,000, and Table 3 is 30,000.
We just refer to the logic in the last picture for pre-join, and then estimate the cost of join. We will let the pre-join with low cost be carried out first, so that the intermediate results can be greatly reduced and the efficiency of join can be improved.
4. Alluxio serves as the cache of object storage
Because of the variety of underlying file storage systems, we chose Alluxio data scheduling system. The advantage of Alluxio is to make data closer to the computing framework, and use memory or SSD multi-level caching mechanism to speed up file access. If cache is completely hit, The file access speed can reach the memory level I/O, reducing the frequency of directly reading files from the underlying file system, and greatly alleviating the pressure on the underlying file system.
For our system, it leads to higher concurrency and is more friendly to queries with low clipping rates, which means a lot of files have to be read.
If the files have been loaded into the cache in previous queries, the query speed can be greatly improved.
After these optimizations, we did a performance comparison test. We selected an ES cluster with a size of 249TB. It uses 20 servers and Flink uses two. To see a more intuitive comparison on the icon, we selected 16 test results.
In the diagram, the red and orange color is ES, the blue color is before HQL optimization, and the green color is after HQL optimization. The above digital label is the HQL performance difference compared to es. For example, the first tag means that THE PERFORMANCE of HQL is five times that of ES. The number 6 and 7 are slower than ES, mainly because HQL is a block index, and ES is a row index, which is all in memory, so the retrieval speed is super fast. The number 13 is because the clipping rate of HQL is relatively poor when not equal is used.
Overall, the optimization effect is obvious, most of the statements in the es query speed is equal to or even slightly better. Fully meet customer expectations for long period data storage and query.
3. Future planning
Above is the plan for the future. Because the customer site often involves a lot of BI Dashboard computing and long cycle computing report requirements, so our next step will be to consider the BI budget, as well as the container and JVM warm-up mentioned by the Soviet Military, and of course, the es standard, and improve the multi-user concurrent query capability.
For more Flink related technical issues, you can scan the code to join the community nail exchange group to get the latest technical articles and community dynamics in the first time, please pay attention to the public number ~