1, the preface

For popular distributed computing frameworks (such as offline MapReduce, streaming Storm, iterative in-memory Spark, streaming Flink), “large data volume” has never been a problem because, in theory, it can be solved by increasing the number of concurrent nodes.

But if the data is skewed or unevenly distributed, then it can be a problem. At this time, the problem cannot be solved simply by increasing the number of concurrent nodes, but must be solved by targeted measures and optimization schemes.

This is the main topic of this article. In fact, various optimization solutions of Hive SQL are closely related to data skew. Therefore, this chapter first introduces the basic concepts of data skew and then describes Hive optimization solutions in various scenarios.

Hive optimization can be divided into join-related optimization and join-independent optimization. In fact, join-related optimization accounts for most of Hive content, and join-related optimization can be divided into join optimization that mapJoin can solve and join optimization that mapJoin cannot solve. This paper will introduce the principle and optimization scheme one by one in detail

2. Main challenges of offline data processing: data skew

Before optimizing each Hive scenario, this section describes the concept of data skew

In fact, there is no specific theoretical definition of data skew. The skew should come from a statistical skew distribution. The so-called skewness distribution is the frequency distribution of statistical data whose peak value is not equal to the average value. According to the peak value is less than or greater than the average value, it can be divided into positive function and negative partial function. The degree of deviation can be described by skewness coefficient. Skew and subcorrelation in data processing have many different meanings. Data skew in data processing is described below.

For distributed data processing, we want the data evenly distributed to each processing node. If each processing node is on the X-axis and the data processed by each node is on the Y-axis, we would like the histogram to look like the following:

But in reality, due to problems with the business data itself or the distribution algorithm, the amount of data allocated to each node is likely to look like the following:

In more extreme cases, it could be something like this:

In other words, the whole data processing task can only be completed until the node with the most data is finished processing the data, so the significance of distribution is greatly reduced. In fact, even if each node is allocated roughly the same amount of data, the data may still skew. For example, considering the extreme problem of statistical word frequency, if a node is allocated only one word, it will obviously take a long time for that node, even if it has the same amount of data as other nodes.

Summary: Hive data skew refers to uneven data distribution caused by Shuffle during MapReduce

How do I troubleshoot data skew problems in Hive SQL?

  • A Reduce task is stuck at 99.9% for half a day.
  • Reduce processes a huge amount of data, so stop the world during full GC. The response times out beyond the default 600 seconds and the task is killed

Hive optimization uses various measures and methods to optimize and handle the tilt problem in the preceding scenarios.

3. Problems to pay attention to during development

In the actual Hive SQL development process, Hive SQL performance problems are only a small part and data skew. A lot of times, Hive SQL runs slowly because developers don’t know enough about the data they’re using and some bad usage habits.

A few things to note for developers:

  • Do the metrics that need to be calculated really need to be self-aggregated from the common detail layer of the data warehouse? Is the common summary layer developed by the data Common layer team ready to meet its needs? For the general public, KPI-related metrics, etc., which usually involve good data warehouses, are certainly already included and can be used directly.
  • Do you really need to scan so many partition tables? For example, for selling detailed transaction tables, scanning partitions for a year and scanning partitions for a week has two orders of magnitude of computational and IO overhead, and the time consumed is certainly different. That’s not to say you can’t scan a year’s worth of partitions, but developers need to think carefully about their business needs and try not to waste computing and storage resources, because big data isn’t free.
  • Do not use select * from table. Specify which columns are used, such as SELECT, COL1,col2 from table. In addition, filter conditions are added to the WHERE condition to remove irrelevant data rows, reducing the amount of data that needs to be processed and distributed in the MapReduce task.
  • The input file should not be a large number of small files. The default Input Split of Hive is 128MB (configurable). Small files can be combined into large files first.

After ensuring that Hive SQL is running for a long time, or even not running, you need to use real Hive optimization techniques.

The following describes Hive optimization methods in each scenario, but developers need to understand their SQL and identify problems based on the slow parts of the execution process to adopt the following targeted solutions.

4. Specific optimization

4.1 Tilt optimization caused by Group BY

The skew caused by group by is mainly caused by the uneven distribution of the input data rows according to the group BY column. For example, assuming the statistics of the order quantity of each taobao store in 2020, the order quantity of most stores is obviously very large, while the order quantity of most stores is average. Since the group by Task is distributed to each Reduce Task according to the ID of the store, the Reduce Task assigned to the large store allocates more orders at this time, resulting in data skew.

For skew caused by group by, the optimization measures are very simple. You only need to set the following parameters:

set hive.map.aggr = true 
set hive.groupby.skewindata=true
Copy the code

In this case, Hive performs load balancing during data skew and generates two MapReduce jobs in the query plan. In the first MapReduce Job, the output result set of Map is randomly distributed to Reduce jobs. Each Reduce performs partial aggregation operations and outputs the results. In this way, the same group by key may be distributed to different Reduce jobs to achieve load balancing. The second MapReduce Job is then distributed to Reduce by group by key based on the preprocessed data results (this process ensures that the same group by key is distributed to the same Reduce), and the final aggregation operation is completed.

4.2 count Distinct Optimization

Use count DISTINCT with caution during Hive development because it can cause performance problems, such as SQL:

select count(distinct user_id) from user_table
Copy the code

Hive will distribute all Map output to a Reduce Task because deduplication is required, which may cause performance problems. In this case, you can use group by and then count to optimize the SQL as follows:

Select count(1) from (select user_id from user_table group by user_id) aCopy the code

The principle is as follows: group by is used to remove weight, and then the number of rows in group BY is counted

4.3 Local Mode (FETCH Mode)

Once when we perform Hive SQL execution graphs, the efficiency is very low, this time can be set inside the Hive will be some statements do not need to program, read data directly from the local due to the Hive. The fetch. Task. The conversion of the parameters have the following three values

set hive.fetch.task.conversion=none
Copy the code

In this case, only the DESC statement does not use MapReduce

set hive.fetch.task.conversion=mimimal
Copy the code

In this case, desc, select *, filter, and Limit do not use MapReduce

set hive.fetch.task.conversion=more
Copy the code

In this case, the desc and SELECT statements, all filtering conditions, and Limit do not use MapReduce

4.4. Parallel execution

Hive transforms a query into one or more phases, which can be the MapReduce phase, sampling phase, merge phase, Limit phase, or other phases that Hive may need during execution. By default, Hive executes only one phase at a time. However, a particular job may contain many phases, which are not completely interdependent. In other words, some phases can be executed in parallel, shortening the execution time of the entire job. However, the more phases that can be executed in parallel, the faster the job is likely to complete.

Parallel execution can be enabled by setting parameters. However, in a shared cluster, if the number of parallel phases of a job increases, the cluster utilization increases

set hive.exec.paraller = true;
Copy the code

4.5 Strict mode

Hive provides a strict pattern that prevents users from executing queries that might have unintended adverse effects.

You can disable the query of type 3 by setting the hive.mapred.mode to strict

  • First, for partitioned tables, holding is not allowed unless the WHERE statement contains a partitioning field filtering condition to limit the data range. In other words, the user is not allowed to scan all partitions. The reason for this restriction is that typically partitioned tables have very large data sets that grow rapidly. Queries without partitioning restrictions could consume an unacceptably large amount of resources to process this table:
hive> select distinct(user_id) from hive_trace where user_id > 1; FAILED: SemanticException Queries against partitioned tables without a partition filter are disabled for safety reasons. If you know what you are doing, please sethive.strict.checks.large.query to false and that hive.mapred.mode is not set to 'strict' to proceed. Note that  if you may get errors or incorrect results if you make a mistake while using some of the unsafe features. No partition predicate for Alias "hive_trace" Table "hive_trace"Copy the code
  • If the order by statement is used, the limit statement must be used, because order BY will distribute all result data to the same Reducer for processing in order to perform the sorting process, forcing the user to add this limit statement to prevent the reducer from executing for a long time:
hive> select user_id from hive_trace where event_month = '202010' order by user_id; FAILED: SemanticException 1:69 Order by-s without limit are disabled for safety reasons. If you know what you are doing, please sethive.strict.checks.large.query to false and that hive.mapred.mode is not set to 'strict' to proceed. Note that  if you may get errors or incorrect results if you make a mistake while using some of the unsafe features.. Error encountered near token 'user_id'Copy the code
  • The third and final case is the query that limits the Cartesian product. Users familiar with relational databases may expect to use a WHERE statement instead of an ON statement when executing a JOIN query, so that the relational database’s execution optimizer can efficiently convert the WHERE statement into that ON statement. Unfortunately, Hive does not perform this optimization, so if the table is large enough, the query will become uncontrollable:

4.6 adjust the number of MapTask and ReduceTask

Hive queries are divided into one or more MapReduce jobs to achieve parallel destinations. Each task may have multiple Mapper and Reducer tasks, at least some of which can be parallel. Determining the optimal number of Mapper and reducer depends on multiple variables, such as the amount of input data and the type of operations performed on these data.

It is necessary to maintain balance. If there are too many mapper and Reducer tasks, there will be too much overhead in the startup stage, job scheduling and job running. If you set too few, you may not be taking full advantage of the inherent parallelism of the cluster.

If the Hive has reduce, the CLI console displays the optimized reducer number. Let’s take a look at an example that includes a group by statement. Introducing such queries always requires a reduc procedure. In contrast, many queries are converted to tasks requiring only the Map phase;The number of reducer is determined based on the input data volume. The dfs-count command is similar to the Linux du -s command to calculate the amount of input. It can calculate the total size of all data in the specified directory:

In some cases, the map phase of the query may generate much more data than the actual input. If the amount of data generated in the map stage is very large, it is a little small to determine the reducer number according to the input amount of data. Similarly, the Map phase may filter out a large part of the input data set and may require a small reducer to satisfy the calculation.

A quick way to do this is to set the reducer number to a fixed number without the need for Hive to calculate this number. If users remember, the default reducer number of Hive should be 3. The mapred.reduce.tasks attribute can be set to different values to determine whether to use more or fewer reducers to reduce execution time. Keep in mind that benchmarking like this can be complicated by external factors, such as other users executing jobs concurrently. Hadoop takes several seconds to start and schedule Map and Reduce jobs (tasks). Take these factors into account when performing performance tests, especially when the job is small.

The attribute hive.exec.reducers. Max is important to control resource utilization when working on large tasks on a shared cluster. The number of Map and Reduce resources a Hadoop cluster can provide is fixed. A large job may consume all the resources, and other jobs cannot run. You can set the attribute hive.exec.reducers. Max to prevent a query from consuming too much reducer resources. It is necessary to configure this property in the $HIVE_HOME/conf/hive-site.xml file. A suggested formula for calculating the size of this attribute value is as follows:

(Total number of reducer slots in the cluster x 1.5)/(Average number of reducer slots queried during execution)Copy the code

4.7. JVM reuse

JVM reuse is a Hadoop tuning parameter that has a significant impact on Hive performance, especially for scenarios where it is difficult to avoid small files or where there are a large number of tasks, most of which have very short execution times.

The Hadoop default is typically configured to commonly derive JVMS to perform Map and Reduce tasks. The JVM startup process can be quite expensive at this point, especially if hundreds or thousands of tasks are executed. JVM reuse can cause JVM instances to reuse N in the same job. The value of N can be set in Hadoop mapred-site. XML ($HADOOP_HOME/conf) :

<property>
      <name>mapred.job.reuse.jvm.num.tasks</name>
      <value>10</value>
      <description>
       How many tasks so run per jvm. If set to -1,there is no limit.
      </description>
</property>
Copy the code

Conclusion: A continuer can run multiple tasks, fundamentally reduce the opening and destruction time of continuer, improve the overall execution efficiency of the program

4.8. Multiple Groups by in a Single MapReduce

Another particular optimization view assembles multiple Group by operations from a query into a single MapReduce task. If you want to start this optimization, you need a common set of group by keys:

<property>
      <name>hive.multigrouphy.singlemr</name>
      <value>false</value>
</property>
Copy the code

Large table join small table optimization

First, we introduce join optimization for large tables and small tables. Again, the sales details fact table is used as an example to illustrate the scenario where a large table joins a small table. If the supplier is rated, for example (five stars, four stars, three stars, two stars, one star), the business person wants to analyze the daily sales of each star of the supplier and its proportion. The developer will write the following SQL:

select 
  b.seller_star, 
  count(a.order_id) as order_cnt 
from 
  (
    select 
      order_id, 
      seller_id 
    from 
      order_table 
    where 
      day = '2020-11-25'
  ) as a 
  left outer join (
    select 
      seller_id, 
      seller_start 
    from 
      dim_seller 
    where 
      day = '2020-11-25'
  ) as b on a.seller_id = b.seller_id 
group by 
  b.seller_star;
Copy the code

However, as mentioned above, the 80-20 rule in the real world will cause orders to be concentrated on a subset of vendors, and good vendors will usually have higher ratings, which will exacerbate the degree of data skew. If not optimized, the above SQL will consume a long time or even run without results.

Generally speaking, suppliers are limited. For example, if there are thousands or tens of thousands of suppliers, the data volume is not very large, while the sales details fact table table is relatively large. This is a typical large table join small table set, which can be optimized by mapJoin hint.

select /*+mapjoin(b)*/
  b.seller_star, 
  count(a.order_id) as order_cnt 
from 
  (
    select 
      order_id, 
      seller_id 
    from 
      order_table 
    where 
      day = '2020-11-25'
  ) as a 
  left outer join (
    select 
      seller_id, 
      seller_start 
    from 
      dim_seller 
    where 
      day = '2020-11-25'
  ) as b on a.seller_id = b.seller_id 
group by 
  b.seller_star;
Copy the code

Mapjoin (b, CD)/.Hive MapJoin (b, CD) is enabled by default.

set hive.auto.convert.join = true;
Copy the code

Mapjoin optimization is to join in the Map phase, rather than join on each Reduce task node after distribution according to join column as usual in the Reduce phase. There is no need for distribution, so there is no skewing problem. Instead, Hive copies all the small tables to each Map task node (for dim_SELLER tables, only the columns specified by SQL in TABLE B are fully replicated), and then lookup the small tables on each Map task node.

If a small table is too large, full copy distribution does not pay the cost.

hive.auto.convert.join.noconditionaltask.size
Copy the code

To check whether the size of the small table meets the requirement (the default size is 25MB). In practice, the maximum value of this parameter can be modified. However, the maximum value cannot be greater than 1GB. In addition, the file size displayed by HDFS is the compressed size. When the file is loaded into the memory, the capacity will be increased by 10 times in some scenarios.

4.10, large table join large table optimization

What if the small table DIM_SELLER described above in MAPJoin is large? Like more than 1GB? This is the problem of a large table joining a large table. This kind of problem is relatively complex, so this section first introduces a specific problem scenario, and then introduces various optimization solutions based on it.

4.10.1 Problem Scenario

The problem scenarios are as follows: Table A is A summary table, which summarizes the transaction information of sellers and buyers in the recent N days. That is, for each seller, the total number of orders and the total amount of each buyer in the recent N days. In order to focus on the problem to be solved in this paragraph, N is only 90 days, and the summary value is only the number of transactions. Select buyer_id, seller_ID, and pay_CNT_90d.

B is the seller’s basic information table, which contains a hierarchical rating information of the seller. For example, the seller is divided into six levels: S0, S1, S2, S3, S4, and S5.

The result to be obtained is the proportion of transactions of each buyer at each level of seller, such as –> certain buyer: S0:10%, S1:20%, S2:30%, S3:20%, S4:10%, S5:20%

Table B has the following fields: seller_ID and s_level

The developer’s first response is to join the two tables and count them:

select 
  m.buyer_id,
  sum(b.pay_cnt_90d) as pay_cnt_90d,
  sum(case when m.s_level = 0 then pay_cnt_90d end) as pay_cnt_90d_s0,
  sum(case when m.s_level = 1 then pay_cnt_90d end) as pay_cnt_90d_s1,
  sum(case when m.s_level = 2 then pay_cnt_90d end) as pay_cnt_90d_s2,
  sum(case when m.s_level = 3 then pay_cnt_90d end) as pay_cnt_90d_s3,
  sum(case when m.s_level = 4 then pay_cnt_90d end) as pay_cnt_90d_s4,
  sum(case when m.s_level = 5 then pay_cnt_90d end) as pay_cnt_90d_s5
from 
  (
    select 
      a.buyer_id, 
      a.seller_id, 
      b.s_level, 
      a.pay_cnt_90d 
    from 
      (
        select 
          buyer_id, 
          seller_id, 
          pay_cnt_90d 
        from 
          table_A
      ) as a 
      join (
        select 
          seller_id, 
          s_level 
        from 
          table_b
      ) as b on a.seller_id = b.seller_id
  ) as m 
group by 
  m.buyer_id;
Copy the code

Some sellers will have millions or even tens of millions of buyers within 90 days, but most sellers will not have many buyers within 90 days. When joining table_A and table_B, ODPS will distribute according to seller_id. Big sellers of table_A cause data skew.

However, this data skewer problem cannot be solved by MapJoin table_B, because there are more than 10 million sellers and several GB files, exceeding the maximum limit of 1GB for MapJoin table.

4.10.2 Scheme 1: Use case when statement when join

This solution is used when the skew values are explicit and few in number, such as the skew caused by null values. Its core is to randomly divide these slanted values into Reduce. Its main core logic is to concat random numbers of these special values during join, so as to achieve the purpose of random distribution. The core logic of this solution is as follows:

select a.user_id,a.order_id,b.user_id
from table_a a
join table_b b
on (case when a.user_id is null then concat('hive',rand()) else a.user_id end) = b.user_id;
Copy the code

The skewinfo and SkewJoin parameters are optimized for Hive. You do not need to change the SQL code. For example, the skew is caused by the values 0 and 1 of Table_B.

set hive.optmize.skewinfo = table_b:(seller_id)[("0")("1")];
set hive.optmize.skewjoin = true;
Copy the code

However, scheme 2 cannot solve the tilt problem in this scenario, because there are a large number of tilted sellers and they change dynamically

4.10.3 Scheme 2: Dynamic bisection

The ultimate solution is dynamic bisection, that is, the tilted key values and the non-tilted key values are divided into cards. The non-tilted key values can be joined normally, and the tilted key values can be found out and then mapJoin, and finally union all.

But this solution is cumbersome, the code becomes complex and requires a temporary table to hold slanted keys.

The pseudocode for adopting this solution is shown below:

-- Due to data skew, Insert overwrite table tmp_table_b select m.seler_id, n.s_level from ( select seller_id from ( select seller_id, count(buyer_id) as byr_cnt from table_A group by seller_id ) as where a.byr_cnt > 10000 ) m left outer join( select user_id, s_level from table_b ) n on m.seller_id = n.user_id;Copy the code

For sellers with more than 10,000 buyers in 90 days, mapJoin directly, and for other sellers, join normally