The article directories

    • preface
    • Key challenges of offline data: “Data skew”
    • Second, Hive optimization
    • 3. Optimization independent of Join
      • 3.1 Tilt optimization caused by group by
      • 3.2 Count DISTINCT Optimization
    • SQL > alter table Join
    • SQL > alter table Join big table
      • 5.1 Problem Scenario
      • Scheme 1: Convert to MapJoin
      • Option 2: Use the case when statement when joining
      • Scheme 3: multiple B table, and then modular JOIN
      • Plan 4: Dynamic bisection

preface

In front, we talked about the Hadoop principle of actual combat, the underlying principle of Hive practice, today to talk about the most concerned Hive optimization practice.

If you have experienced offline data processing, you know that Hive SQL optimization methods are closely related to data skew, so I will first talk about the basic concept of “data skew”, and then introduce Hive optimization solutions in various scenarios.

Hive optimization can be divided into join optimization and join independent optimization.

In fact, join-related optimization accounts for most of Hive optimization, and join-related optimization can be divided into join optimization that mapJoin can solve and join optimization that mapJoin cannot solve.

Key challenges of offline data: “Data skew”

First, the concept of “data skew” is introduced.

The “skew” should come from the skew distribution in statistics, and the skew of the data processing species is related to this.

For distributed data processing, we want the data to be evenly distributed to each processing node, as shown in the following figure:



However, due to problems of business data itself or distribution algorithm, the amount of data allocated to each node may not be what we expected, for example:



Even more extreme cases can occur:



In other words, the whole data processing task can only be completed until the node with the most data is finished processing the data, and the meaning of distribution is greatly reduced. Think of the stuck 99%.

In fact, even if each node is allocated roughly the same amount of data, the data may still skew. For example, considering the extreme problem of statistical word frequency, if all words assigned to a node are one word, it will take a long time to show that node, even if the amount of data is the same as that of other nodes.

Hive optimization uses various measures and methods to optimize and deal with the tilt problem in the above scenarios.

Second, Hive optimization

In the actual Hive SQL development process, Hive SQL performance problems are only a small part of the data dump.

A lot of times, Hive SQL runs slowly because developers don’t know enough about the data they’re using and some bad usage habits.

Developers need to make sure that:

  • Do the metrics that need to be calculated really need to be self-aggregated from the common detail layer of the data warehouse? Is the common summary layer developed by the data Common layer team ready to meet its needs? Common data warehouse layers that are usually well designed for common, KPI-related metrics and so on are certainly included and can be used directly.
  • Do you really need to scan so many partitions? For example, for sales detail transaction tables, scanning partitions for a year and scanning partitions for a week has two orders of magnitude of computational and IO overhead, and the time consumed is certainly different. As developers, we need to carefully consider the needs of our business and try not to waste computing and storage resources!
  • Try not to useselect * from your_tableThis way, you specify which columns are used.select coll, col2 from your_tableIn addition, filter conditions are added to the WHERE condition to remove irrelevant data rows and reduce the amount of data that needs to be processed and distributed in the MapReduce task.
  • The input file should not be a large number of small files. The default Input Split of Hive is 128MB (configurable). Small files can be merged into large files first.

Hive SQL can run for a long time, or even run out of time, when you need real Hive optimization technology!

3. Optimization independent of Join

Hive SQL performance problems are mostly related to join. For problems unrelated to join, there are groupby related skew and count distinct related optimization.

3.1 Tilt optimization caused by group by

The skew caused by group BY is mainly caused by the uneven distribution of input data rows according to the group by column.

For example, suppose that the order number is counted according to the fact table of suppliers’ sales details, then the order quantity of some large suppliers is obviously very large, while the order quantity of most suppliers is average. Since the group by is distributed to each Reduce Task according to the ID of suppliers, At this point, the Reduce Task assigned to the large supplier will allocate more orders, resulting in data skew.

For skew caused by group by, optimization measures are very simple, just set the following parameters:

set hive.map.aggr = true 
set hive.groupby.skewindata=true
Copy the code

In this case, Hive performs load balancing during data skew, and the generated query plan contains two MapReduce jobs.

  • In the first MapReduce Job, Map output result sets are randomly distributed to Reduce jobs. Each Reduce Job performs partial aggregation operations and outputs results. As a result, the same GroupBy Key may be distributed to different Reduces to achieve load balancing.
  • The second MapReduce Job is then distributed to Reduce jobs by GroupBy Key based on the preprocessed data (this process ensures that the same GroupBy Key is distributed to the same Reduce), and the final aggregation operation is completed.

3.2 Count DISTINCT Optimization

Use count DISTINCT with caution during Hive development because it can cause performance problems, such as the following SQL:

select count(distinct user) from some_table
Copy the code

Hive will distribute all Map output to Reduce tasks, which may cause performance problems. In this case, you can use group by and then count to optimize the SQL as follows:

select count(*) from (
	select user 
	from some_table 
	group by user
	) tmp;
Copy the code

The principle is as follows: Group by is used to remove weights first, and then the number of rows in group BY is counted.

SQL > alter table Join

Join-related optimizations are mainly divided into optimizations that mapJoin can solve (that is, large table joins small table) and optimizations that MapJoin cannot solve (that is, large table joins large table). Join a large table on a small table is relatively easy to solve. Join a large table on a large table is relatively complex and difficult to solve, but not unsolvable.

First, we introduce join optimization for large tables and small tables. Again, the sales details fact table is used as an example to illustrate the scenario where a large table joins a small table.

If the supplier has a rating, for example (five stars, four stars, two stars, one star), the business person wants to be able to analyze the daily sales of each supplier star and its proportion.

Developers typically write SQL like this:

select Seller_srar, count(order_id) as ordre_cnt
from (
	select order_id,seller_id 
    from dwd_sls_fact_detail_table
    where partition_value ='20170101'
) a
Left outer join(
	select seller_id,seller_star
    from dim_seller
    where partition_value='20170101'
) b
on a.seller_id = b.seller_id
group by b.seller_star;
Copy the code

However, as mentioned above, real-world 80/20 rules will result in orders being concentrated on a subset of suppliers, and good suppliers will usually have higher ratings, which will exacerbate the data skew. If not optimized, the above SQL will take a long time, or even run without results!

Generally speaking, there are a limited number of suppliers, such as thousands or tens of thousands, and the data volume is not very large, while the sales details fact table is relatively large. This is a typical problem of large join small table, which can be optimized by adding mapJoin hint. The optimized SQL is as follows:

select /*+mapjoin(b)*/ Seller_srar, count(order_id) as ordre_cnt
from (
	select order_id,seller_id 
    from dwd_sls_fact_detail_table
    where partition_value ='20170101'
) a
Left outer join(
	select seller_id,seller_star
    from dim_seller
    where partition_value='20170101'
) b
on a.seller_id = b.seller_id
group by b.seller_star;
Copy the code

/*+ mapJoin (b)*/ that is, mapJoin HIMT. If mapJoin multiple tables is required, the format is /*+ mapJoin (b,c,d)*/.

Hive is enabled for mapJoin by default. The following parameters are set:

Set hive.auto.convert.join=ture;
Copy the code

Mapjoin optimization is to join in the Map phase, rather than join on each Reduce task node after distribution according to join column as usual in the Reduce phase. There is no need for distribution, so there is no skewing problem. Instead, Hive copies the small table to each Map task node in full (in this case, dim_Seller, of course, only the columns specified by SQL in TABLE B), and then lookup the small table on each Map task node.

As can be seen from the above analysis, the small table should not be too large, otherwise full copy distribution is not worth the loss.

  • Actually Hive based on parametershive.mapjoin.smalltable.filesize(0.11.0 after this ishive.auto.convert.join.noconditionaltask.size) to determine whether the size of the small table meets the criteria (default: 25M).
  • In practice, the maximum value of this parameter can be modified. However, the maximum value cannot be greater than 1GB. (if the value is too large, the memory of the node where the Map task is located will overflow and Hive will report an error. In addition, the file size displayed by HDFS is the compressed size. When the file is loaded into the memory, the capacity will be increased by 10 times in some scenarios.

SQL > alter table Join big table

What if the small table DIM_SELLER described above in MAPJoin is large? Like more than 1GB? This is the problem of a large table joining a large table.

This kind of problem is relatively complex, we first introduce a specific problem scenario, and then introduce various optimization schemes based on this.

5.1 Problem Scenario

Let’s start with a hypothetical problem scenario:

Table A is A summary table, which summarizes the transaction summary information of sellers and buyers in the recent N days, that is, for each seller in the recent N days, the total number of orders and the total amount of each buyer. Here, N only takes 90 days, and the summary value only takes the number of transactions. Table A contains buyer_id, seller_ID, and pay_CNT_90d.

Table B is the seller’s basic information table, which contains a hierarchical rating information of the seller, for example, the seller is divided into six levels: S0, S1, S2, S3, S4, S5 and S6.

To obtain the results of each buyer in each level of sellers transaction ratio information, such as:

Some buyers S0:10%; S1:20%; S2:20%; S3:10%; S4:20%; S4:10%; S5:10%.

Table B contains seller_ID and s_level.

As in the example in mapJoin, our first reaction is to join the table directly and count:

select 
m.buyer_id 
,sum(pay_cnt_90d) as pay_cnt_90d 
,sum(case when m.s_level=O then pay_cnt_90d end) as pay_cnt_90d_s0 
,sum(case when m.s_level=l then pay_cnt_90d end) as pay_cnt_90d_sl 
,sum(case when m.s_level=2 then pay_cnt_90d end) as pay_cnt_90d_s2 
,sum(case when m.s level=3 then pay cnt 90d end) as pay_cnt_90d_s3
,sum(case when m.s_level=4 then pay_cnt_90d end) as pay_cnt_90d_s4 
,sum(case when m.s_level=S then pay_cnt_90d end) as pay_cnt_90d_s5 
from 
(
select 
a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d 
from 
(
select buyer_id ,seller_id,pay_cnt_90d 
from table A 
) a 
join 
(
select seller_id,s_level 
from table B 
) b 
on a.seller_id=b.seller_id 
) m 
group by m.buyer_id
Copy the code

However, this SQL can cause data skew due to the seller’s 80-20 rule. Some sellers will have millions or even tens of millions of buyers within 90 days, but most sellers will not have many buyers within 90 days. ODPS will distribute according to Seller_id when joining table_A and table_B, and large sellers of table_A cause data bias.

However, this data skewer problem cannot be solved by MapJoin table_B, because there are more than 10 million sellers and the file size is several GB, which exceeds the maximum limit of 1GB for MapJoin table.

Scheme 1: Convert to MapJoin

A large table cannot be directly mapJoin, so can it be indirectly? There are actually two ways to do this: restrict rows and restrict columns.

  • Restricted rows: do not need to join the entire table B, only those existing in table A. For this problem scenario, the sellers who have not closed within 90 days are filtered out.
  • Restricted columns: Only the required fields are fetched.
select 
m.buyer_id 
,sum(pay_cnt_90d) as pay_cnt_90d 
,sum(case when m.s_level=O then pay_cnt_90d end) as pay_cnt_90d_s0 
,sum(case when m.s_level=l then pay_cnt_90d end) as pay_cnt_90d_sl 
,sum(case when m.s_level=2 then pay_cnt_90d end) as pay_cnt_90d_s2 
,sum(case when m.s level=3 then pay cnt 90d end) as pay_cnt_90d_s3
,sum(case when m.s_level=4 then pay_cnt_90d end) as pay_cnt_90d_s4 
,sum(case when m.s_level=S then pay_cnt_90d end) as pay_cnt_90d_s5 
from 
(
select /*+mapjoin(b)*/
a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d 
from 
(
select buyer_id ,seller_id,pay_cnt_90d 
from table_A 
) a 
join 
(
select b0.seller id,s_level 
from table_B b0
    join
    (select seller_id from table_A group by seller_id) a0
    on b0.seller_id=a0.seller_id
) b 
on a.seller_id=b.seller_id 
) m 
group by m.buyer_id
Copy the code

This solution works in some cases, but in many cases it does not solve the above problem because most sellers have a few buyers, although 90 buyers are not many, and the filtered B table is still large.

Option 2: Use the case when statement when joining

Application scenarios: Skew values are explicit and few in number, such as skew caused by null values.

The core logic of randomly distributing these skew values to Reduce is to concat random numbers of these special values during join, so as to achieve the purpose of random distribution. The core logic is as follows:

Select a.user_id,a.order_id,b.user_id 
From table_a a 
Join table_b b 
On (case when a.user_id is null then concat ('hive' ,rand()) else a.user_id end)=b.user_id
Copy the code

Hive has been optimized for this function. You only need to set parameters without modifying SQL. For example, the values “0” and “1” of table_B cause skew, just set it as follows:

set hive.optimize.skewinfo=table_B:(seller_id)[("0")("1")];
set hive.optimize.skewjoin=true;
Copy the code

But plan TWO does not solve these problems, because the number of tilted sellers is large and dynamic.

Scheme 3: multiple B table, and then modular JOIN

  • General plan

Create a numbers table with a single column of int rows, e.g., 1 to 10(depending on the skew), then enlarge B table 10 times, and select module JOIN.

select 
m,buer_id
,sum(pay_cnt_90d) as pay_cnt_90d 
,sum(case when m.s_level=O then pay_cnt_90d end) as pay cnt 90d so 
,sum(case when m.s_level=l then pay cnt 90d end) as pay cnt 90d_sl 
,sum(case when m.s_level=2 then pay_cnt_90d end) as pay_cnt_90d s2 
,sum(case when m.s_level=3 then pay_cnt_90d end) as pay_cnt_90d_s3 
,sum(case when m.s_level=4 then pay_cnt_90d end) as pay cnt 90d s4 
,sum(case when m.s level=S then pay cnt 90d end) as pay cnt 90d s5 
from 
(
select 
a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d 
from
(
select buyer_id,seller_id,pay_cnt_90d 
from table_A 
) a 
JOin 
(
select /*+mapjoin(members)*/ 
seller_id,s_level,member 
from table_B 
join 
numbers 
) b 
on a.seller_id=b.seller_id 
and mod(a.pay_cnt_90d,10)+1=b.number 
) m 
group by m.buyer_id
Copy the code

Since seller_id is skewed, manually add a column and the skew of the previously skewed value is reduced by 1/10. You can reduce the skew by configuring the numbers table to change the magnification, but the downside is that table B will expand by a factor of N.

  • Proprietary solutions

The general scheme idea is to enlarge every data in table B by the same multiple. In fact, it only needs to enlarge the large sellers by multiple.

First of all, we need to know the list of top sellers, that is, we need to create a temporary table to dynamically store the latest top sellers every day (such as DIM_big_seller), and at the same time, the top sellers in this table should be inflated by a preset multiple (such as 1000 times).

Create A join column in table A and B respectively. The logic is as follows: if A large seller is A large seller, concat A random positive integer (between 0 and A predefined multiple, in this case 0-1000). If not, stay the same.



Compared to the general scheme, the dedicated scheme is significantly more efficient, because only the large seller’s rows in table B are enlarged by 1000 times, while the other sellers’ rows remain the same. However, you can also see that the code is much more complex, and the large seller table must be established first.

Plan 4: Dynamic bisection

In fact, both schemes 2 and 3 use the idea of splitting into two, but they are not complete. For the problems that mapJoin cannot solve, the ultimate solution is dynamic splitting, that is, processing tilted key values and non-tilted key values separately, and normal join with non-tilted key values is ok. Find them at a tilt and do mapjoin, and then union all.

But this solution is cumbersome, the code becomes complex and requires a temporary table to hold slanted keys.

-- For sellers with more than 10000 buyers in 90 days, map Join directly; for other sellers, join normally
select 
m.buyer_id 
,surn(pay_cnt_90d) as pay_cnt_90d 
,surn(case when rn.s_level=O then pay_cnt_90d end) as pay_cnt_90d_s0 
,surn(case when rn.s_level=l then pay_cnt_90d end) as pay_cnt_90d_sl 
,surn(case when rn.s_level=2 then pay_cnt_90d end) as pay_cnt_90d_s2 
,surn(case when rn.s_level=3 then pay_cnt_90d end) as pay_cnt_90d_s3 
,surn(case when rn.s_level=4 then pay_cnt_90d end) as pay_cnt_90d_s4 
,surn(case when rn.s_level=S then pay_cnt_90d end) as pay_cnt_90d_s5
from
(
select 
    a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d
    from
    (
    select buyer_id,seller_id,pay_cnt_90d
        from table_A
    ) a
    join
    (
    	select seller_id ,a.s_level
        from table_A a
        left outer join tmp_table_B b
        on a.user_id = b.seller_id
        where b.seller_id is null
    ) b 
on a.seller id=b.seller id 
union all 
select /*+mapjoin(b)*/ 
a.buyer_id,a.seller_id,b.s_level,a.pay_cnt_90d 
from 
select buyer_id,seller_id,pay_cnt_90d 
from table A 
) a 
join 
select seller_id,s_level 
from table B 
) b 
on a.seller id=b.seller id
) m group by m.buyer_id
) m
group by m.byer_id
Copy the code

In summary, the generic solutions in scenarios 1, 2, and 3 are not guaranteed to solve large table join large table problems because of various limitations and specific usage scenarios.

The dedicated scheme of scheme 3 and scheme 4 are recommended optimization schemes, but they both need to create a temporary table to store the large sellers that change dynamically every day.

Compared to scheme 4, the special scheme of scheme 3 does not need to modify the code framework, but table B will be enlarged, so it must be a dimension table, otherwise the statistical results will be wrong. Solution 4 is the most common solution with the highest degree of freedom, but it also has the most changes to the code, even requiring changes to the code framework, which can be used as the ultimate solution.

I am “Qi Yun”, a big data development ape who loves technology and can write poems.