preface
Tuning guidelines
- Hive is a commonly used big data component. The performance of Hive is affected by data skew, redundancy, excessive jobs and I/OS, and improper MapReduce allocation
Therefore, we can make adjustments from its table building design, HiveSQL optimization, component configuration parameters and the underlying MapReduce. In a word, the loss of resources and the running time of tasks can be reduced as much as possible on the premise of ensuring the business results remain unchanged.
-
The HiveSQL layer converts multiple jobs at the layer and executes them in parallel. Jobs with a large number of jobs are naturally slow. Even if you have a small amount of table data, MapReduce jobs take a long time to initialize, so it takes a long time
-
When Hive performs big data analysis, local aggregation and two-stage aggregation in mapper phase can solve common data tilt problems caused by sum, count, and Max /min UDAF. However, join mode is more complicated
-
Good table design is also an optimization
-
Setting a reasonable level of Task parallelism can improve performance
-
It is also a good idea to understand the distribution of data directly and deal with it accordingly
-
Use count(DISTINCT) with a large amount of data, and group by can be problematic, but use group BY with aggregate class operations, and the problem is less serious.
-
For small file merge, and tuning means itself is for the lack of resources under the situation, so, all the tuning means are not direct to add resources to the most simple and rough, if directly can add machine, that don’t think too much 😂
First, build the table design level
1.1 Partition table and Bucket Table
We’ve all heard that internal tables, external tables, partitioned tables, bucket tables, internal tables and external tables have no difference in performance. A partitioned table is a table that stores data in certain dimensions, a partition corresponds to a directory, and when you query, it has partitioned fields, Hive only needs to traverse the corresponding partition directory (actually a HDFS directory). This reduces the amount of data to be processed, for example
Select * from day where day = 'yesterday' select * from day where day = 'yesterday'Copy the code
Our Hive table is a lot of partitions by day, but if the build is not a partition table, then we query, will still traverse all the data, and then filter out the data generated yesterday, but if it is a partition table, it queries when it went directly to yesterday’s folder there to take.
So if a table uses this field for where filtering most of the time, remember to change it to a partitioned table.
Bucket partitioning is a slightly different concept, in that data is hashed to a specified number of buckets with the value of a specified column as the key. The idea is to avoid going through all the data, for example
select a.*,b.* from a join b on a.id = b.id
Copy the code
At this point, if A and B are two large tables, the execution efficiency of this task is bound to become a problem, and there may be data skew. If the ID of table A is often used in join queries, then table A should be designed as a bucket table and id should be used as a bucket field.
And it makes it easier to sample the data later
By the way, Doris, which is also an OLAP engine, has a colocation join in it. In the above example, bucket partitioning actually hides two conditions: one is that both tables A and B must be bucket partitioning tables, and the number of bucket partitioning between them must be multiplied. In Doris, the number of buckets is required to be equal, so that it can ensure that the data with the same bucket number in the two tables are on the same node.
Incidentally, Doris’s SQL execution is much more efficient than Hive’s. Common OLAP engines have the following advantages: column storage + range partitioning + sort + index + compression + pre-settlement + data skew solution
1.2 Appropriate storage format
The default Hive storage format is TextFile, such as Student (ID,name,sex,department··· 100 fields).
select department,count(*) as total from student group by department
Copy the code
At this point, we can see that we don’t use any attributes other than department at all, so if we had a column store, we could have avoided having all of the student fields attached every time we read a data. If we use 100 files to store 100 attributes of the student table, we can solve our problem.
For wide tables, use ORC and Parquet columns as much as possible, because columns are physically stored together. So you can only handle 1/100 of the data
1.3 Suitable compression format
The compression algorithm needs to determine whether the current task is IO intensive or computational intensive. Compression can only be considered if the task is IO intensive and requires a large amount of resources. Of course, we will use CPU resources to replace network resources, so we must ensure that the CPU resources are sufficient. If it’s computationally intensive, you can’t burden the CPU any more.
The choice of compression algorithm depends on compression rate, compression rate, and whether it can be split. A file, when compressed, will look something like this
header + body
Copy the code
Header is used to store metadata information, while body is real data. Header will record the size of data before and after compression and the compression algorithm, etc. Only in this way can we know which algorithm to use when decompressing. Detachable and undetachable refer to the metadata information of the header. After the original real data has been shred, whether to keep a copy of the header for each shred block. To avoid the problem of not being able to decompress later
Compressed format | Separable or not | Whether their own | The compression ratio | speed | Whether Hadoop comes with it |
---|---|---|---|---|---|
gzip | no | is | high | faster | is |
lzo | is | is | Relatively high | soon | no |
snappy | no | is | Relatively high | soon | no |
bzip2 | is | no | The highest | slow | is |
1.3.1 Compression Use
Job output files are GZip compressed according to blocks:
# # the default value is false set mapreduce.output.fileoutputformat.com press = true; Press # # the default value is the Record set mapreduce.output.fileoutputformat.com. Type = BLOCK # # The default value is org.apache.hadoop.io.com press. DefaultCodec set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.G zipCodecCopy the code
Map output is also compressed with Gzip:
# # enable output map compression set mapred.map.output.com press = true # # the default value is org.apache.hadoop.io.com the DefaultCodec set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodecCopy the code
Compress both the Hive output and the middle:
Set hive.exec.com press. The output = true # # the default value is false, without compression set hive.exec.com the intermediate = true # # the default value is false, Compression of MR Settings is enabled only when trueCopy the code
2. HQL syntax and running parameters
2.1 Viewing Hive Execution Plans
To learn about the conversion process, run the following command in the SQL statement to view the execution plan
Add the extended keyword to view more detailed execution plan explain [extended] queryCopy the code
2.2 Column cropping and partition cropping
This is essentially reading only the required columns when querying, partition clipping is reading only the required partitions. So try not to select *, and specify partitions
When Hive reads data, it can read only the columns required in the query and ignore other columns. This saves on read overhead: intermediate table storage overhead and data consolidation overhead.
set hive.optimize.cp = true; Select only the columns needed in the query. Default is trueCopy the code
Partition clipping is reading only the required partitions.
set hive.optimize.pruner=true; ## Default is trueCopy the code
The ColumnPruner logic optimizer corresponds to the HiveQL parsing phase.
2.3 Predicate push-down
A lot of people don’t understand this concept, so let me help you understand it, for example
In this scenario, it is obvious that we should filter where at each node to reduce the amount of data that the node transmits to the client, because the transmission requires network resources. For example, when four nodes calculate 1W students, after passing through the client where, only one student over 25 years old is obtained. Then 9999 results are transmitted for nothing. Is it a loss
So what predicate push down is, in this example, is this filtering behavior that’s delivered to the compute node to operate on. For example, the working mode of the HBase coprocessor is also a predicate push-down.
Execute all where predicate logic in SQL statements as early as possible to reduce the amount of data processed downstream. The corresponding logic optimizes PredicatePushDown.
set hive.optimize.ppd=true; ## Default is trueCopy the code
Example:
select a.*, b.* from a join b on a.id = b.id where b.age > 20; Select * from table A where age>20; select * from table B where age>20; c.* from a join (select * from b where age > 20) c on a.id = c.id;Copy the code
2.4 Merging small Files
2.4.1 Map Input Merging
When the MapReduce program is executed, a data block of a file requires a mapTask to process. However, if the data source is a large number of small files, this will start a large number of mapTask tasks, which can waste a lot of resources. You can reduce the number of mapTask tasks by merging small files entered.
If you want to merge, set it up
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
Copy the code
Of course you don’t have to merge
set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;
Copy the code
Note The default Hive format is TextInputFormat
2.4.2 Merge Map/Reduce output
A large number of small files bring pressure to the HDFS, affecting the processing efficiency. You can eliminate the impact by merging Map and Reduce result files.
Set hive.merge.mapfiles=true; Set hive.merge.mapredfiles=true; Set hive.merge.sie.per. task=256000000; ## set mapred.max.split. Size =256000000; # # one node split is worth at least set mapred. Min. The split. The size.. Per node = 1; / / # # a rack server node split the minimum value of the set mapred. Min. The split. The size.. Per rack = 1; // Server rackCopy the code
When set mapred. Min. The split. The size.. Per node = 1, if the current node has 300 1 m small file, specify the section size of 256 m, the remaining 44 m will be transferred to another node merging, But if the node has 500 1 m small files, the remaining 244 m, this is the set mapred. Min. The split. Size. Per. The node = 2 will be a bit more reasonable.
2.5 Properly Setting the Parallelism of MapTask
The source of data calculated based on the current Mapper phase may be raw data or the output of a previous Map. We have to break it down, first the raw data, which is relatively manageable.
If the input file is very large and a large number of MapTasks are assigned to each compute node, you can reduce the number of MapTasks. Increase the amount of data processed by each MapTask. There are also too many MapTasks, resulting in too many resulting files. If the input files are large, the task logic is complex, and the MapTask execution is slow, you can increase the number of MapTasks to reduce the amount of data processed by each MapTask and improve the task execution efficiency.
As we know, the number of MapTasks for a MapReduce Job is determined by InputSplit. Input sharding is determined by fileInputFormat.getsplit (). An input shard corresponds to a MapTask, and the input shard is determined by three parameters:
parameter | The default value | meaning |
---|---|---|
dfs.blocksize | 128M | The default HDFS data block size |
mapreduce.input.fileinputformat.split.minsize | 1 | Minimum Fragment size (MR) |
mapreduce.input.fileinputformat.split.maxsize | 256M | Maximum Fragment size (MR) |
For those of you who don’t know anything about this, I’ll add it in another post later. In the case of raw data, the input fragment size is calculated as follows:
long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
Copy the code
In addition, the adjusted splitSize should be an integer multiple of blockSize, otherwise there will be a large number of transferring to other nodes for merging as just mentioned
By default, the input fragment size is the same as the default data block size of the HDFS cluster, that is, one data block is enabled by MapTask. This avoids data transfer between server nodes and improves job processing efficiency
2.5.1 Properly Controlling the Number of MapTasks
1. If you want to increase the number of mapTasks, set mapred.map.tasks to a larger value
2. If you want to reduce the number of mapTasks, set maperd.min.split.size to a larger value
3. If you want to reduce the number of mapper files, set hive.input.format to merge small files
MapReduce provides the following parameters to control the number of Map tasks:
Total file size: total_size Data block size set by the HDFS: dfs_block_size default_mapper_num = total_size/dfs_block_sizeCopy the code
If default_mapper_num is greater than default_mapper_num, the number of maptasks will be specified.
set mapred.map.tasks=10; The default value is 2Copy the code
So what if we need to reduce the number of MapTasks, but the file size is fixed?
Mapred.min.split.size can be used to set the size of the file to be processed for each task. This size will only take effect if it is greater than dfs_block_size
split_size = max(mapred.min.split.size, dfs_block_size)
split_num = total_size / split_size
map_num = Math.min(split_num, Math.max(default_mapper_num,
mapred.map.tasks))
Copy the code
2.6 Reasonably set the ReduceTask parallelism
If the number of ReduceTask is too much, a ReduceTask will generate a result file, which will generate many small files. Then if these result files will be used as the input of the next Job, the problem that small files need to be merged will occur. And it takes resources to start and initialize ReduceTask.
If the number of ReduceTask is too small, such a ReduceTask will need to deal with a large amount of data, and there may be the problem of data skewness, which will make the whole query time-consuming. How Hive determines the number of reducerTasks becomes a key issue. Unfortunately, Hive’s estimation mechanism is very weak. If the number of ReducerTasks is not specified, Hive will guess and determine the number of a ReducerTask
The maximum size parameters of data / / each reduceTask processing 1: hive. The exec. Reducers. Bytes. Per. The reducer (default 256 m) / / reduceTask 2 upper limit of the number of parameters: Hive.exec.reducers. Max (the default value is 1009) Parameter 3: mapreduce.job.reduces (The default value is -1, which indicates that this parameter is not configured.Copy the code
If you set mapReduce.job. reduces, ReducerTask is the number you set in scenarios where there is no Order by, otherwise reduceTask is
ReduceTask_num = math.min (parameter 2, total input data size/parameter 1)Copy the code
As a rule of thumb, parameter 2 can be set to M * (0.95 * N) (N is the number of NodeManagers in the cluster). Generally, the number of NodeManage and Datanodes is the same. 0.95 provides some fault tolerance, which may lead to node downtime
2.7 the Join optimization
2.7.1 Overall Principles
1. Perform join operations before filtering, minimizing the amount of data participating in join
We must minimize the amount of data in each stage, use as many partitioned fields as possible in partitioned table, and select only the columns that need to be used later to minimize the amount of data participating in Join.
If small tables join large tables, enable MapJoin. Mapjoin is automatically enabled for Hive. The size of small tables cannot exceed 25M and can be changed
When joining a large table from a small table, the principle of joining a large table from a small table should be followed. The reason is that in the Reduce phase of join operation, the contents of the table on the left of the join will be loaded into the memory, and the table with fewer entries will be placed on the left to effectively reduce the probability of memory overflow. In a JOIN, jobs are generated from left to right. Ensure that the size of the table in a continuous query increases from left to right.
If the conditions of Join on are the same, it is better to add the same job, and the order of Join table is from small to large:
select a.*,b.*, c.* from a join b on a.id = b.id
join c on a.id = c.i
Copy the code
4. If multiple tables join, and if multiple join conditions are the same, the job will be converted to a job
In Hive, if three or more tables are joined and the on conditions use the same field, they are combined into one MapReduce Job. With this feature, you can add the same join on to one Job to save execution time.
Try to avoid a SQL that contains complex logic, and if you do, use intermediate tables to accomplish complex logic.
If a large table joins a large table, we also have the following two measures
1. Empty key filtering: Sometimes join timeout is caused by too much data corresponding to some keys, while data corresponding to the same keys are sent to the same Reducer, resulting in insufficient memory. In this case, we should carefully analyze the abnormal keys. In most cases, the data corresponding to these keys is abnormal data, which needs to be filtered in SQL statements.
2. Empty key transformation: sometimes there are many data corresponding to an empty key, but the corresponding data is not abnormal data and must be included in the result of join. At this time, we can assign a random value to the field with empty key in Table A, so that the data can be randomly and evenly divided into different reducer
2.7.2 MapJoin
MapJoin distributes small tables of both sides of a join directly to the memory of each Map process. In this way, join operations are performed in the Map process, eliminating reduce operations and increasing the speed. MapJoin can only be enabled for the JOIN operation.
Whether to automatically convert the common Join on the Reduce end into Map Join and flush the small table into memory according to the size of the input small table? Set hive.auto. Convert. join = true; # # the size of the brush into the memory table (bytes) # #, that is, the size of a small table up to 25 m, generally we can adjust to the biggest 2 g set hive. Mapjoin. Smalltable. Filesize = 25000000; # # hive automatically based on the table size convert ordinary join into mapjoin set hive. Auto. Convert. Join. Noconditionaltask = true; The size of the table can be automatically triggered into the inner LocalTask. The default size is 10M. Because we will have some task to be stand-alone mode more efficient set hive. Auto. Convert. Join. Noconditionaltask. Size = 10000000;Copy the code
You can also manually enable MapJoin yourself, using a specific syntax
/*+mapjoin(smalltable)*/
SELECT /*+ MAPJOIN(smallTable) */ smallTable.key, bigTable.value FROM
smallTable JOIN bigTable ON smallTable.key = bigTable.key;
Copy the code
2.7.3 Sort Merge – Bucket (SMB) Map the Join
The prerequisite for using this technique is that all tables must be bucket table and bucket sort. The summary is the following two requirements
1. Do the same hash hash for the two buckets joining the join, and sort the data in each bucket
2. The number of buckets in these two tables should be multiplied.
# # when the user performs the bucket map join, found that cannot be executed, banned the query set hive. Enforce. Sortmergebucketmapjoin = false; # # if the join table through sort merge join condition, whether the join will be automatically converted into a sort merge join set hive. Auto. Convert. Sortmerge. Join = true; If the number of buckets in the smaller table is a multiple of the number of buckets in the larger table, mapJoin can be enabled to improve efficiency. # bucket map join optimization, set the default value is false hive. Optimize the bucketmapjoin = false; # # bucket map join optimization, set the default value is false hive. Optimize the bucketmapjoin. Sortedmerge = false;Copy the code
2.8 Join Data skew optimization
When writing the JOIN query statement, if it is determined that the data skew is caused by join, you can set the following parameters:
Set hive.skewjoin.key = 100000; Set hive.optimize. Skewjoin = false;Copy the code
If this function is enabled, the number of slanted key pairs that exceed the threshold hive.skewjoin.key (100000 by default) will be written to a temporary file during Join. Then, another job will be started to create map Join results.
Through the hive. Skewjoin. Mapjoin. Map. The tasks parameter can also control the second job number mapper, 10000 by default.
set hive.skewjoin.mapjoin.map.tasks=10000;
Copy the code
2.9 the CBO optimization
CBO, cost optimizer, the cheapest execution plan is the best execution plan. In traditional databases, the cost optimizer makes optimal execution plans based on statistical information. The same goes for Hive’s cost optimizer. Join the order of the tables: all the preceding tables are loaded into memory. The following tables are scanned for disks
Since 0.14.0, Hive has added a “Cost based Optimizer” to optimize the HQL execution plan. This function is enabled by “hive.cbo.enable”. This feature is enabled by default after Hive 1.1.0. It automatically optimizes the order of multiple JOINS in the HQL and selects an appropriate Join algorithm.
When you want to do a join condition on a table and join a table, for example
select a.*, b.*, c.* from a join b on a.id = b.id join c on a.id = c.id;
Copy the code
If you want to use this function, you can turn on the following parameters
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
Copy the code
2.10 How to do the Cartesian Product
When Hive is set to strict mode (hive.mapred.mode=strict), Cartesian product cannot occur in HQL statements
2.11 Group By Optimization
By default, data of the same Key in the Map phase is distributed to one Reduce. If the data of a Key is too large, data skew will occur. The group by operation can be optimized in two ways
2.11.1 Partial Map Aggregation
In fact, not all aggregation operations need to be performed on the Reduce side. Many aggregation operations can be performed on the Map side first, and then the final result can be obtained on the Reduce side.
Set hive.map.aggr=true; # # set pre-aggregated rows threshold map end, more than the value will split the job, the default value of 100000 set hive. Groupby. Mapaggr. Checkinterval = 100000Copy the code
2.11.2 Load Balancing when Data Skew Occurs
If group by is used in HQL statements and data skew occurs, Hive automatically balances load if this variable is set to true. The strategy is to split the MapReduce task into two: the first one does the pre-summary and the second one does the final summary
Set hive.groupby. skewinData =false;Copy the code
When the option is set to true, the generated query plan has two MapReduce jobs.
1. In the first MapReduce job, map output results are randomly distributed to Reduce tasks. Each Reduce task performs partial aggregation operations and outputs the results. So as to achieve the purpose of load balancing;
2. The second MapReduce task is then distributed to each Reduce according to group by key according to the preprocessed data results, and finally the final aggregation operation is completed.
It looks complicated, but if you actually draw a picture it’s easy to understand this two-stage convergence
After this operation, if there is still data skew, it will not be too serious.
2.12 Order By optimization
Order by can only be performed in one Reduce process. Therefore, if you perform order BY on a large data set, a large amount of data will be processed in a Reduce process, resulting in slow query execution.
1. Order by on the final result and try not to sort on the middle large data set. If the final result is small and can be sorted on a reduce, order by is performed on the final result set.
2. If the first N items of sorted data are selected, distribute BY and sort BY can be used to sort the first N items of sorted data on each Reduce. Then, the result sets of each Reduce are combined and sorted globally in a Reduce, and the first N items are selected. Because the maximum amount of data in order by participating in global sorting is the number of Reduce * N, the execution efficiency will be greatly improved.
All of this is easy to understand. Hive provides four syntax types for sorting data. You must distinguish the use modes and application scenarios of the four types of sorting. I’m not going to expand it here
2, Sort by: order by: order by: order by: order by: order by: order by: order by: order by: order by: order by: Distribute by + sort by: Distribute by ensures that the value of the same field exists in only one result file, and combines sort by to ensure the order of each reduceTask resultCopy the code
2.13 Count Distinct Optimization
When a certain column is to be counted for deduplication, if the data volume is large, count(DISTINCT) is very slow. The reason is similar to order BY. The count(DISTINCT) logic is only dealt with by a small reducer. In this case we can rewrite it as group by
Before optimization, a normal reduceTask was used only to carry out count(DISTINCT) operations
Select count(distinct ID) from student; select count(distinct ID) from student; Select count(1) from (select id from student group by id) TMP;Copy the code
However, two MR jobs will be started (only one DISTINCT job will be started). Therefore, this method is recommended only when the overhead of starting a job is large enough to be shorter than the calculation time. Group by may also be slower than DISTINCT when the data set is small or the key skew is obvious.
2.14 / the exists in
The in/ EXISTS syntax was not supported in earlier versions of Hive, but has been since Hive-0.8x. But this syntax is not recommended. Although hive-2.3.6 also supports in/exists operations as tested, an efficient alternative to Hive is recommended: left semi JOIN, which is not expanded here
2.15 Using Vectorization (good to know)
When calculating scan, filter and aggregation, Vectorization technology sets the incremental size of batch processing to 1024 rows at a time to achieve higher efficiency than a single record at a time
set hive.vectorized.execution.enabled=true ;
set hive.vectorized.execution.reduce.enabled=true;
Copy the code
Hive has a lot of things that work for you just by turning on the switch. HHH
2.16 Multiple modes
Multi-insert syntax is used when multiple inserts are read from a table and used multiple times. However multi Insert syntax has some limitations.
1. Generally, a maximum of 128 output lines can be written in a single SQL file. If more than 128 output lines are written, a syntax error is reported.
2. In a multi Insert: For partitioned tables, the same destination partition is not allowed to appear more than once. For an unpartitioned table, the table cannot appear more than once.
3. For different partitions of the same partition table, do not have insert overwrite and INSERT INTO operations at the same time; otherwise, an error message is returned
There are also multi-group by, if you are interested, you can go to know about it
2.17 compressed
2.17.1 Map Output Compression
set mapreduce.map.output.compress=true;
set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.SnappyCodec;
Copy the code
2.17.2 Compression of Intermediate Data
Intermediate data compression is used to compress data between multiple jobs queried by Hive. It is best to choose a compression method that saves CPU time. You can use the SNappy compression algorithm, which has high compression and decompression efficiency.
set hive.exec.compress.intermediate=true;
set hive.intermediate.compression.codec=org.apache.hadoop.io.compress.SnappyCodec;
set hive.intermediate.compression.type=BLOCK;
Copy the code
2.17.3 Result Data Compression
The resulting data can also be compressed. You can select a disk that reduces the size of the data and the disk read/write time. Note: The commonly used Gzip and SNappy compression algorithms do not support parallel processing. If the data source is a large gzip/ SNappy compressed file, the query efficiency will be seriously affected if a Mapper processes this file. Therefore, if the result data needs to be used as the data source for other query tasks, you can choose the LZO algorithm supported by Splitable. In this way, result files can be compressed and processed in parallel, which greatly improves the speed of job execution.
set hive.exec.compress.output=true;
set mapreduce.output.fileoutputformat.compress=true;
set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.G
zipCodec;
set mapreduce.output.fileoutputformat.compress.type=BLOCK;
Copy the code
2.17.4 Compression Algorithms supported by Hadoop Clusters
org.apache.hadoop.io.compress.DefaultCodec
org.apache.hadoop.io.compress.GzipCodec
org.apache.hadoop.io.compress.BZip2Codec
org.apache.hadoop.io.compress.DeflateCodec
org.apache.hadoop.io.compress.SnappyCodec
org.apache.hadoop.io.compress.Lz4Codec
com.hadoop.compression.lzo.LzoCodec
com.hadoop.compression.lzo.LzopCodec
Copy the code
Hive architecture
3.1 Enabling Local Capture
Some Hive SQL statements need to be converted into MapReduce operations, but some SQL statements do not need to be converted into MapReduce operations. Note that all SQL statements need to be converted into MapReduce operations in theory
Some simple operations do not need to be converted to MapReduce due to Hive optimization during SQL statement conversion. For example:
Select *; select *; select *; select *; select *; select *Copy the code
Hive reads data from the HDFS in two modes: Enable MapReduce reading and capture. Fetching data directly is much faster than reading data using MapReduce, but only a few operations can use direct fetching. Can through the hive. The fetch. Task. The conversion parameters configuration is in what circumstances to use direct way to grab
Minimal: Only select *, in the partition field where filter, limit these three scenarios only enable direct capture. More: in the select, where screening, limit, has enabled direct scraping set hive. Fetch. Task. Conversion = minimal/more;Copy the code
3.2 Perform optimization locally
Hive queries run on multiple servers in a cluster by default. This mode effectively solves the problem of large data query.
However, when the amount of data in Hive query processing is small, it is unnecessary to enable distributed query execution because distributed design, cross-network transmission, and multi-node coordination consume resources. For small data sets, all tasks can be processed on a single machine in local mode, with significantly reduced execution time.
There are three parameters involved in starting local mode
# # to open the hive automatically determine whether to start a local mode switch set hive. The exec. Mode. Local. Auto = true; # # maximum number of map tasks, do not enable local mode of task maximum number set hive. The exec. Mode. Local. Auto.. Input files. Max = 4; Maximum size # # map input file, don't start a local model of maximum input file size set hive.. Exec mode). The local auto. Inputbytes. Max = 134217728;Copy the code
If the task is less than 4 and the data is less than 128MB, then I can run it locally for you
3.3 the JVM reuse
Hive statements are converted into a series of MapReduce tasks. Each MapReduce task consists of a series of MapTasks and reducetasks. By default, A MapTask or ReduceTask in MapReduce can start a JVM process. After a Task is executed, the JVM process exits. If the task takes a short time and the JVM has to be started multiple times, the JVM startup time can become a significant expense, which can be solved by reusing the JVM.
set mapred.job.reuse.jvm.num.tasks=5;
Copy the code
Disadvantages: Enabling JVM reuse can tie up slots for used tasks for reuse until the Task is complete. If several Reduce tasks in an unbalanced job spend much more time executing than other Reduce tasks, the reserved slots will remain idle and cannot be used by other jobs until all tasks are finished.
3.4 Parallel Execution
Hive converts some query statements into one or more stages, including the MapReduce phase, sampling phase, merge phase, and Limit phase. By default, only one phase is executed at a time. However, some phases can be executed in parallel if they are not interdependent. Multi – stage parallelism is resource – consuming.
A Hive SQL statement may be converted to multiple MapReduce jobs. Each Job is a stage, and these jobs are executed in sequence. You can also see this in cli run logs. However, sometimes these tasks are not interdependent. If cluster resources allow, multiple non-interdependent stages can be executed concurrently, which saves time and improves execution speed. However, if cluster resources are insufficient, On the contrary, enabling parallelism causes each Job to seize resources from each other, which degrades the overall execution performance. Enable parallelization:
Concurrent execution can be enabled. set hive.exec.parallel=true; The maximum parallelism allowed for the same SQL is 8 by default. set hive.exec.parallel.thread.number=16;Copy the code
3.5 Speculative Execution
In the distributed cluster environment, because the program Bug Bug (including Hadoop itself), load imbalance or resource distribution does not equal the original cause, will result in the same homework do not match the speed between multiple tasks, the running speed of some tasks may be significantly slower than other tasks (such as a job of a task schedule is only 50%, While all other tasks have already run), these tasks will slow down the overall execution progress of the job. To prevent this, Hadoop uses a Speculative Execution mechanism, which speculates laggard tasks based on certain principles and starts a backup task for that task that processes the same data concurrently as the original task. Finally, the calculation result of the first successful operation is selected as the final result.
# the mapper stage of enforcement mechanisms set graphs. The map. The speculative = true; # start stage reducer of enforcement mechanisms set graphs. Reduce the speculative = true;Copy the code
These features can be turned off if users are sensitive to run-time deviations. If the user needs MapTask or ReduceTask for a long time because of the large amount of input data, the waste caused by starting the speculative execution will be very huge.
3.6 Hive Strict Mode
In strict mode, users are not allowed to execute risky HiveQL statements. To improve the execution efficiency of SQL statements in Hive, you can set strict mode to make full use of Hive features.
Set hive.mapred. Mode =strict; set hive.exec.dynamic.partition.mode=nostrict;Copy the code
Note: When set to strict mode, the following restrictions apply:
1. For partition tables, Student_ptn where age > 25 select * from student_ptn where age > 25 by age limit 100; Select a.*, b.* from a, b; 4. In Hive dynamic partition mode, a column static partition is required if the strict partition mode is usedCopy the code
3.6 Data Skew
Data skew can be referenced in the Previous Spark article to help you understand all aspects of Spark Core tuning
Finally
Disrepair of the public number we can pay attention to, will gradually restore the update. I hope we all get something out of it