First, modify the engine

The default processing engine is MapReduce

You can modify the processing engine bit Spark

After the Spark engine is used, the speed is increased by more than 10 times

2. Parameter setting

2.1. The following Settings are displayed during startup

In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Copy the code

The number of reduce tasks is determined by the following three parameters: mapred.reduce.tasks(specifies the number of reduce tasks)

Hive. The exec. Reducers) bytes) per. Reducer (each reduce data processing tasks, defaults to 1000 ^ 3 = 1 g)

Hive.exec.reducers. Max (Maximum number of reduces per task, default is 999)

Calculate the number of reducers formula is simple N = min (hive. The exec. Reducers. Max, total amount of input data/hive exec. Reducers. The bytes. Per. Reducer)

Only one Reduce scenario: a, no summary of group by b, order by C, Cartesian product

Fetch

Fetch Indicates that MapReduce is not required to perform certain queries in Hive. For example, SELECT * FROM employees; In this case, Hive can simply read the files in the employee storage directory and output the query results to the console. In the hive – default. XML. The template file hive. Fetch. Task. The default is more just, old version hive default is minimal, this attribute is modified to more later, Global search, field search, and limit search do not use MapReduce.

<property>
    <name>hive.fetch.task.conversion</name>
    <value>more</value>
    <description>
      Expects one of [none, minimal, more].
      Some select queries can be converted to single FETCH task minimizing latency.
      Currently the query should be single sourced not having any subquery and should not have
      any aggregations or distincts (which incurs RS), lateral views and joins.
      0. none : disable hive.fetch.task.conversion
      1. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
      2. more  : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)
    </description>
</property>
Copy the code

Actual case:

1) the hive. The fetch. Task. Just set to none, then execute the query, will perform program. hive (default)> set hive.fetch.task.conversion=none;
hive (default)> select * from emp;
hive (default)> select ename from emp;
hive (default)> select ename from emp limit 3;
2) the hive. The fetch. Task. Just set up more, and then execute the query, query the following method will not execute the program. hive (default)> set hive.fetch.task.conversion=more;
hive (default)> select * from emp;
hive (default)> select ename from emp;
hive (default)> select ename from emp limit 3;
Copy the code

4. Local mode

Most Hadoop jobs require the full scalability that Hadoop provides to handle large data sets. However, sometimes the amount of input data to Hive is very small. In this case, triggering the execution of the task for the query may take much more time than the actual job. In most cases, Hive can handle all tasks on a single machine in local mode. For small data sets, execution times can be significantly reduced. Users can set the hive. The exec. Mode. Local. Auto value is true, to allow the hive automatically start the optimization at the appropriate time.

set hive.exec.mode.local.auto=true;  // Enable the local Mr
// Set the maximum input data amount of the Local Mr. If the input data amount is smaller than this value, the local Mr Is used. The default value is 134217728 (128 MB)
set hive.exec.mode.local.auto.inputbytes.max=50000000;
// Set the maximum number of input files for the local Mr. If the maximum number of input files is smaller than this value, the local Mr Mode is adopted. The default value is 4
set hive.exec.mode.local.auto.input.files.max=10;
Copy the code

Actual case:

1) Enable the local mode and execute the hive (default)> set hive.exec.mode.local.auto=true; 
hive (default)> select * from emp cluster by deptno;
Time taken: 1.328 seconds, Fetched: 14 row(s)
2Disable the local mode and execute the hive (default)> set hive.exec.mode.local.auto=false; 
hive (default)> select * from emp cluster by deptno;
Time taken: 20.09 seconds, Fetched: 14 row(s)

Copy the code

Five, table optimization

5.1 Small table join large table

The tables with relatively scattered keys and small data amount are placed to the left of the join, which can effectively reduce the chance of memory overflow errors. Further, you can use MapJoin to advance memory for small dimension tables (with fewer than 1000 records). Reduce is completed on the MAP side. The test results show that the new version of Hive has optimized small table JOIN large table and large table JOIN small table. Small watch on the left and right has no obvious difference.

A case in field1. Requirement Test the efficiency of large table joining small table and small table joining large table2. Statements that create large tables, small tables, and JOIN tables// Create a large table
create table bigtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';
// Create a small table
create table smalltable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';
// Create the statement after the join table
create table jointable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';

3. Import data into large and small tablesdefault)> load data local inpath '/uardata1/hivetest/bigtable' into table bigtable;
hive (default)>load data local inpath '/uardata1/hivetest/smalltable' into table smalltable;

4. Disable mapJoin (enabled by default). Mapjoin cache small tables in memory, so to test large tables and small tables join, disable mapJoin set Hive.auto-convert.join =false;

5. Insert overwrite table joinTABLE SELECT b.id, B.id, B.keyword, B.run_rank, B.col_num, B.col_num b.click_url from smalltable s left join bigtable b on b.id = s.id; # 3 test Time taken:11.304 seconds
Time taken: 17.587 seconds
Time taken: 10.252 seconds

6. Insert overwrite table joinTABLE select b.id, B.id, B.keyword, B.run_rank, B.col_num, B.col_num, B.col_num b.click_url from bigtable b left join smalltable s on s.id = b.id; # 3 test Time taken:14.343 seconds
Time taken: 11.867 seconds
Time taken: 13.149 seconds
Copy the code

Large table join large table

5.2.1, empty Key

Sometimes join times out because too much data corresponding to some keys is sent to the same reducer. As a result, the memory is insufficient. At this point, we should carefully analyze these exception keys. In many cases, the data corresponding to these keys is abnormal data, and we need to filter in the SQL statement. For example, if the field corresponding to key is empty, do as follows:

(1Create original table, empty ID table, merged table// Create the original table
create table ori(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';
// Create an empty id table
create table nullidtable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t';
// Create the statement after the join table
create table jointable(id bigint, time bigint, uid string, keyword string, url_rank int, click_num int, click_url string) row format delimited fields terminated by '\t'; (2Hive (int id, int id, int id, int id);default)> load data local inpath '/uardata1/hivetest/ori' into table ori;
hive (default)> load data local inpath '/uardata1/hivetest/nullid'into table nullidtable; (3) test does not filter for nullsid
hive (default)> insert overwrite table jointable 
select n.* from nullidtable n left join ori o on n.id = o.id;

Time taken: 23.255 seconds
Time taken: 20.177Seconds (4If a certain field does not exist, the data will be filtered and the data of other fields will be normal. This will result in less data samples and less data information of other fields.default)> insert overwrite table jointable 
select n.* from (select * from nullidtable where id is not null ) n  left join ori o on n.id = o.id;
Time taken: 7.946 seconds
Time taken: 7.882 seconds

Copy the code

5.2.2 Empty key conversion

Sometimes, although there is a lot of data corresponding to an empty key, the corresponding data is not abnormal data and must be included in the join result. In this case, we can assign a random value to the fields in Table A whose key is empty, so that the data cannot be evenly divided into different Reducer randomly. Such as:

5.2.2.1 Null values are not randomly distributed:

(1) setting5Number of reduce actions set mapreduce.job5; (2Insert overwrite table joinTABLE SELECT n.* from NULlidTable n left JOIN ORI B on N.ID = B.ID; Time taken:23.528 seconds
Time taken: 21.05 seconds
Copy the code

5.2.2.2. Null values are randomly distributed

(1) setting5A number of reduceset mapreduce.job.reduces = 5; (2)JOINThe two tables # convert the empty key to random data, and ensure that the key of the original data will not conflict with the hash distribution of the empty keyinsert overwrite table jointable
select n.* from nullidtable n full join ori o on 
case when n.id is null then concat('hive', rand()) else n.id end = o.id;
Copy the code

5.3, MapJoin

If MapJoin is not specified or the conditions of MapJoin are not met, the Hive parser converts the Join operation to Common Join, that is, the Join is completed in the Reduce phase. Data skewing is easy to occur. You can use MapJoin to load all small tables to the existing MAP side for join, avoiding reducer processing.

5.3.1. Open MapJoin

(1) set auto select Mapjoin set Hive.auto-convert. join =true; The default istrue2) big table small table setting threshold (the default is 25 m once considered a small table) : set hive. Mapjoin. Smalltable. Filesize =25000000;

Copy the code

5.3.2 Working mechanism of MapJoin

5.3.3, instance

(1) Enable Mapjoin function set Hive.auto-convert. join =true; The default istrue2Insert overwrite table joinTABLE SELECT b.id, B.id, B.keyword, B.rank, B.link_num, B.link_num b.click_url from smalltable s join bigtable b on s.id = b.id; Time taken:4.817Seconds (3Insert overwrite table JoinTABLE SELECT b.id, B.id, B.keyword, B.rank, B.link_num, B.link_num b.click_url from bigtable b join smalltable s on s.id = b.id; Time taken:5.915 seconds
Copy the code

Compared with 5.1, 5.2, the time is doubled. Does the front and back position of the size table matter

5.4, the Group By

By default, the Map phase distributes the same Key data to a Reduce. When the Key data is too large, the data skews. Not all aggregation operations need to be performed on the Reduce side. For many aggregation operations, partial aggregation can be performed on the Map side first and the final result can be obtained on the Reduce side.

5.4.1 Enabling Map Aggregation Parameters

1) whether to aggregate on the Map side. The default isTrue
hive.map.aggr = true2The number of entries) aggregation operations on the Map hive. Groupby. Mapaggr. Checkinterval= 1000003Load balancing is performed when data skew exists (default isfalse) hive. Groupby. Skewindata= true
Copy the code

If this option is set to true, the resulting query plan will have two MR jobs. In the first MR Job, Map output results are randomly distributed to Reduce jobs. Each Reduce Job performs partial aggregation operations and outputs results. In this way, the same Group By Key may be distributed to different Reduce jobs to achieve load balancing. The second MR Job is distributed to Reduce based on the pre-processed data results By Group By Key (this process can ensure that the same Group By Key is distributed to the same Reduce), and the final aggregation operation is completed.

5.5. Count(Distinct) Deduplicate statistics

When the data volume is small, it does not matter. When the data volume is large, COUNT DISTINCT operation needs to be completed by a Reduce Task. The data volume of this Reduce Task is too large, so the whole Job is difficult to complete. Select * from ‘DISTINCT’ where ‘GROUP BY’ is used before ‘COUNT’ is used:

5.5.1 Case 1

1. Create a large table in hive (default) >create table bigtable(id bigint, time bigint, uid string, keyword
string, url_rank int, click_num int, click_url string) row format delimited
fields terminated by '\t';

2. Loading data Hive (default)> load data local inpath '/uardata1/hivetest/bigtable' into table
 bigtable;
 
3. Set up the5Number of reduce actions set mapreduce.job5;

4. Query the HIVE (default) >select count(distinct id) from bigtable;
OK
99947
Time taken: 3.284 seconds, Fetched: 1 row(s)

5. Use GROUP by to remove weightid
hive (default)> select count(id) from (select id from bigtable group by id) a;
OK
99947
Time taken: 3.28 seconds, Fetched: 1 row(s)
Copy the code

It may take more than one Job to complete the task, but it is definitely worth it when there is a large amount of data.

5.6. Cartesian product

Avoid Cartesian product. Do not add on condition or invalid ON condition when join. Hive can use only one Reducer to complete Cartesian product.

5.7 Row and Column Filtering

  • Column processing: In SELECT, only take the desired columns, if any, try to use partition filtering, use SELECT * less.
  • Row processing: In partition clipping, when using external association, if the filter condition of the side table is written after Where, then the full table will be associated first, then filtering later. For example:

5.7.1 Case 1

1. Select * from 'where' where (' where ') where (' where ') where (' where ')default)> select o.id from bigtable b
join ori o on o.id = b.id
where o.id <= 10;
Time taken: 3.282 seconds, Fetched: 100 row(s)

2. After the subquery succeeds, associate table Hive (default)> select b.id from bigtable b
join (select id from ori where id <= 10 ) o on b.id = o.id;
Time taken: 4.232 seconds, Fetched: 100 row(s)

Copy the code

5.8 dynamic partition adjustment

In a relational database, when you Insert data into a partitioned table, the database automatically inserts the data into the corresponding Partition based on the value of the partitioned table. Hive provides a similar mechanism, called Dynamic Partition.

5.8.1 Enabling Dynamic Partition Parameter Settings

(1Enable the dynamic partition function. (DefaulttrueAnd open the hive. The exec. Dynamic. Partition =true2(The default mode for dynamic partitioning is strict, which means that at least one partition must be specified as static partition. Nonstrict mode indicates that dynamic partitioning is allowed for all partition fields.) Hive. The exec. Dynamic. Partition. Mode = nonstrict (3What is the maximum number of dynamic partitions that can be created on all MR nodes? hive.exec.max.dynamic.partitions=10004The maximum number of dynamic partitions that can be created on each node where MR is performed. This parameter needs to be set according to the actual data. For example, if the source data contains a year's data, the day field has365, then the parameter needs to be set to greater than365If the default value is used100, an error will be reported. hive.exec.max.dynamic.partitions.pernode=1005) Maximum number of HDFS files that can be created in a MR Job. hive.exec.max.created.files=1000006) Whether to throw an exception when a null partition is generated. Generally, no setting is required. hive.error.on.empty.partition=false
Copy the code

5.8.2 Case 1

Requirement: Insert the data in ori into the corresponding partition of ori_partitionED_target by time (for example, 20111230000008).

(1Create partition tablecreate table ori_partitioned(id bigint, time bigint, uid string, keyword string,
 url_rank int, click_num int, click_url string) 
partitioned by (p_time bigint) 
row format delimited fields terminated by '\t'; (2Load data into partitioned tablesdefault)> load data local inpath '/uardata1/hivetest/ds1' into table
 ori_partitioned partition(p_time='20200730000010') ;
hive (default)> load data local inpath '/uardata1/hivetest/ds2' into table ori_partitioned partition(p_time='20200730000011') ; (3Create the target partition tablecreate table ori_partitioned_target(id bigint, time bigint, uid string,
 keyword string, url_rank int, click_num int, click_url string) PARTITIONED BY (p_time STRING) row format delimited fields terminated by '\t'; (4) set up the dynamic partitioning the set hive. The exec. Dynamic. Partition =true;
set hive.exec.dynamic.partition.mode = nonstrict;
set hive.exec.max.dynamic.partitions = 1000;
set hive.exec.max.dynamic.partitions.pernode = 100;
set hive.exec.max.created.files = 100000;
set hive.error.on.empty.partition = false;

hive (default) >insert overwrite table ori_partitioned_target partition (p_time) 
select id, time, uid, keyword, url_rank, click_num, click_url, p_time from ori_partitioned; (5Check the partition status of the target partition tabledefault)> show partitions ori_partitioned_target;

Copy the code