preface
BloomFilter, as a mature algorithm, data structure and optimization means, has been implemented in multiple MPP databases and can be directly called. For example, Doris can be used as a type of index column — Bitmap, which supports storing multi-row index values in bitmap. However, BloomFilter cannot be used directly in Hive, which is developed early and mature, and is a batch architecture. This makes it difficult to develop big data using Hive or SparkSQL as offline computing and processing engine, because we know that BloomFilter, an excellent optimization method, cannot be used. The author encountered similar problems when reconstructing the old tasks of the company (still using Hive as the processing engine), so I investigated the use of BloomFilter under Hive technology stack, and presented the results of the research one by one, hoping to provide some help to the later people.
Technology stack selection
If your company’s technology stack selection is flexible, it is recommended to choose the MPP database that implements BloomFilter, which is popular in the community, and Spark on Scala (note that SparkSQL is not SparkSQL, The Spark version is 2.x+), which is also cheap to use, and when the only technology stacks you can choose from are Hive or SparkSQL, you need UDF functions to support BloomFilter for your tasks. In the following article, I will describe the implementation of BloomFilter under the two stacks, as well as several different implementations of BloomFilter that are currently popular under THE SQL platform, and explain how they are used in the two scenarios and the pros and cons of each.
Implementation of BloomFilter in Spark on Scala
For big data developers with spark2. x+Scala development environments, implementing this complex scenario in Scala is the easiest. The official spark2. x library already provides BloomFilter classes (in the spark.sql.Dataset directory), so you just need to know how to use them.
A large table is associated with a small/medium table
Here, the small table refers to the scenario where the small table (much larger than 25M) still cannot be placed into the broadcast variable, and the data associated with the small table after the large table is associated with the small table is less. If you have two tables to associate, df2 is a large table, and DF1 is a small/medium table, then your idea is to extract the finest-grained field key1 from the primary key associated with DF1 and DF2 and flood it into BloomFilter to form a deleted dataset. Before association, DF2 is filtered by the Bloom filter (time complexity O(N), and it is only Map operation). In this way, before association, the filtered data of DF2 is greatly reduced than before, and the probability of cache memory occupied by invalid association, shuffle memory and disk overflow is greatly reduced during join. This saves far more time than it takes to set up the Bloom filter. The parameters passed to BloomFilter in the code below are the association key, the number of unique ids expected to be stored, and the fault tolerance rate (the hash function may mark data that does not belong to the collection as belonging to the collection).
Val df1_bloomfilter = df1.stat. BloomFilter ($"key1", 25000, 0.05) val udf_df1_bloomfilter = udf((x: String) => df1_bloomfilter.mightcontain (x)) val _df2 = df2.filter(udf_DF1_bloomfilter ($"key1") final_df = df1.join(_df2, Seq("key1"), "left_outer").na.fill("default")Copy the code
Big data precision weight reduction
Here is a brief description of the scenario and ideas. If you want a large data set under the unique id number, this id increment new every day, and the upper DWS layer is based on the id index (atomic) composite index in a wide range of output, and the composite index statistics not every incremental or have active portion of the id, instead of a period in history, or the whole history of id, Then we can calculate such UV numbers efficiently for us through RoaringBitmap. The first step is to introduce RoaringBitmap: import org. RoaringBitmap. RoaringBitmap second step, we maintain an id number, why can maintain reference bitmap is introduced: Mp.weixin.qq.com/s/xxauNrJY9… , we will do a lexicographical sort of this set of ids according to the ID itself, and assign the serial number from 0 to the number N after sorting. Then, we will reorder and assign the serial number to the new ID every day when there is a new ID. Thus, we have a mapping table of ids and ordinals. In the third step, we associate the corresponding serial numbers of various ids back to the DWD large and wide table with various statistical dimensions with ID as the main primary key and day as the partition. Then, when the DWS layer needs to aggregate in various statistical dimensions and count the ID in each dimension to de-count, we all new a Roaring64NavigableMap data structure, and put the serial number of the day in the combination of this dimension into the data structure (which can be simply understood as a long array). In this class, the method is getLongCardinality()). When we need to count the multi-day deduplicating UV, we can directly execute or operate the multi-day corresponding Roaring64NavigableMap data structure (or() in this class).
Implementation of BloomFilter in Hive/SparkSQL
Brickhouse
First step, register BloomFilter builder function and check whether function in BloomFilter, we can clone Brickhouse warehouse to know class directory, and then create temporary UDF function in turn. For example, to use the Brickhouse Bloom filter, you need to register these three functions in sequence:
CREATE TEMPORARY FUNCTION bloom AS 'brickhouse. Udf. Bloom. BloomUDAF' USING JAR '/ TMP/XXX brickhouse - 0.7.1 - the SNAPSHOT. JAR'; CREATE TEMPORARY FUNCTION distribute_bloom AS 'brickhouse.udf.bloom.DistributedBloomUDF' USING JAR '/ TMP/XXX brickhouse - 0.7.1 - the SNAPSHOT. Jar'; CREATE TEMPORARY FUNCTION bloom_contains AS 'brickhouse.udf.bloom.BloomContainsUDF' USING JAR '/ TMP/XXX brickhouse - 0.7.1 - the SNAPSHOT. Jar';Copy the code
The second step is to create BloomFilter. Pass a table key through the aggregation to the UDAF function bloom(key_id) that builds BloomFilter. Insert Overwrite Local Directory bloomFile (why use bloomFile as a file rather than as an intermediate data set) This when speak Hivemall compared to everybody can understand), because has been written to disk, so you also need to read the file data to the cache, the current job do for distribution in multiple tasks of the machine can be used to select the file data, we will be distributed data to multiple distributed cache, the code is as follows: Distributed_bloom (‘ mybloom ‘). Third, we use the BloomFilter that has been placed in the distributed cache to screen the main table data. We use the UDF function: Bloom_contains (key, distributed_bloom(‘mybloom’)) == true preserves records that exist only in this BloomFilter. The BloomFilter implementation inherits from the Filter class and uses a data structure called a Byte Array, which expands as more elements are added, adding new bytes to the array. Bytehas 8 bits, so each byte element can hold only 8 unique elements, compared to long’s 64 bits, but in theory the two arrays should take up the same amount of space. This also leads to another problem. When your dataset is large enough (say, 50 million deduplicates), the BloomFilter files that need to be stored on the local machine can be very large, and when they are large enough, they can take up space on the local machine that is needed for normal service operation, both storage and physical memory. For example, the ability of the local Hiveserver to write logs locally can cause hiverServer to run down (don’t ask me how I know that). As a side note, we tested the inherited Filter, which uses the byte array’s bloem Filter to store 5 million ids and takes up about 65MB of space. RoaringBitmap should be able to store 20 million elements and only take up about 3 MB of space. So use the most advanced RoaringBitmap when possible.
Hivemall one
Hivemall uses BloomFilter as follows: First step, register the corresponding BloomFilter function, repeat the second step above, read the table that needs to be added to BloomFilter, perform aggregation operations, and add the corresponding key to BloomFilter. The code example is as follows:
bloom_cuid AS (
SELECT app,bloom_build(cuid) AS bloom_key
FROM (
SELECT
cuid,
app
FROM actcuid_info
) tmp
GROUP BY app
),
Copy the code
Note that once BloomFilter is built, not only can we create a BloomFilter for each group by aggregation, but we can also use the BloomFilter as another table/data set without having to spend any read/write I/O to save the data locally and read it out again. Don’t be happy yet, keep reading. Third step, associate the large table with the BloomFilter by key, and call the UDF function bloom_contains() for each record in the association to label the data of the large table, that is, whether it can be associated with the small table by key. The code is as follows:
select CASE WHEN b.bloom_key IS NULL THEN false ELSE bloom_contains(b.bloom_key, a.cuid) END AS is_updated from ( ... A left join (SELECT bloom_key, app FROM bloom_cuid) b on (a.app = b.app)Copy the code
As you write, you’ll be surprised at the convenience of the BloomFilter implementation, which blends well with Hive code. But when you run the code, boooom gives you an error, right? Why do I have to report errors when building BloomFilter? Because the key is too long? Because there’s too much data? When I look at the source code of the BloomFilter implementation, I see that BloomFilter is still implemented from the Filter class (the library was built much later than BrickHouse). This class initializes the BloomFilter with a line of code called: return newDynamicBloomFilter(DEFAULT_BLOOM_FILTER_SIZE, DEFAULT_ERROR_RATE, NUM_HASHES); The default value for DEFAULT_BLOOM_FILTER_SIZE is 1024*1024, which is only about 1 million. It does not allow you to pass this value in UDF functions, so it is impossible not to get an error when the number of ids is large. I changed the value to 40 million when the BloomFilterUDAF class was initialized and ran out of the jar as quickly as possible. After BloomFilter was constructed, the second task was to associate large tables. Here something strange happened again. I found that in the Reducer stage of BloomFilter association large tables, the engine only assigned 1 task! Even with BloomFilter’s high judgment efficiency, a task traverses hundreds of millions of data slowly. The key is the combination of dimensions and the value is BloomFilter. In this implementation, it is a very long string. It takes up too much space to load broadcast variables, so broadcast Join is not used. If only common Join is available, then hundreds of millions of data will be associated with one record. Since only one partition will be hashed, only a Reducer task will be assigned. BrickHouse’s implementation simulates the process of putting BloomFilter into broadcast variables. It also avoids the Hive processing engine’s restrictions on broadcast variable size. BloomFilter is distributed across multiple machines with large tables. Bloom_contains does a Map filter and does not involve the expensive shuffle phase, which is much better than Hivemall (of course, it takes up a lot of space and writes local is hard).
Hivemall second
There are two ways to solve the common join problem of Hivemall. One is to store the long string with only one BloomFilter as an intermediate variable in the cache of the job, and the other is to divide the association between BloomFilter and the large table into multiple tasks to calculate. For the first idea, I haven’t found any good solution yet. Here’s the second one. If you have a good answer, please feel free to share it in the comments below. If you want to associate a large table with a join distribution of one record, does this feel similar to some of the problems we often encounter with big data? Yes, data skew! This problem is the same as when you associate a large table with a small table. The small table is extremely data skewed and most keys are of one value, resulting in too many records being hashed to a reducer and slow processing. On the basis of BloomFilter having only one record, we can copy 100 copies of this record, each corresponding to a new key (serial number), and then generate a random serial number from 0 to 99 for each record of the big table (adding salt to solve the idea of incline). The data can be fully allocated to each reducer with this new key when association. And because each Reducer BloomFilter data set is the same, there will be no offset in the calculation results. A code example is as follows:
select ... from ( select floor(rand()*100) AS salt_key, ... . A LEFT JOIN (SELECT bloom_key, app, salt_key FROM bloom_cuid AS x lateral view ,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,3 explodes (split (' 0 8,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,7 8,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99 ', ', ')) AS salt_key) ON b y (a.a pp = b.a pp AND a.salt_key = b.salt_key)Copy the code
One problem with this operation is that it increases the computation cost significantly. One BloomFilterUDAF can take up tens to hundreds of megabytes, and you have to make 100 copies, which can end up taking up several to ten gigabytes of memory for the entire job. Moreover, the shuffle operation caused by association, that is, the shuffle operation that finally merges data together, also consumes additional resources.
The warehouse address
Addresses of warehouses where two Hive UDFs reside: BrickHouse: github.com/klout/brick… Hivemall:github.com/apache/incu…
Thanks for watching!