Series of articles

Hive for Big Data Analytics (1) Hive for Big Data Analytics (2) Hive for Big Data Analytics (3)


@TOC


preface

This document describes advanced hive operations. It mainly includes the following aspects. Hive table data compression and file storage format Enterprise-level tuning of Hive


1. Compress Hive table data

1. Data compression description

  • Evaluation of compression mode
    • Compression methods can be evaluated using the following three criteria
      • 1. Compression ratio: The higher the compression ratio, the smaller the file after compression, so the higher the compression ratio, the better
      • 2. Compress time: The faster the better
      • 3. Whether the compressed format file can be split again: the split format allows a single file to be processed by multiple Mapper programs, which can be better parallelized
  • Common Compression formats
Compression way Compression ratio Compression speed Decompression speed Separable or not
gzip 13.4% 21 MB/s 118 MB/s no
bzip2 13.2% 2.4 MB/s 9.5 MB/s is
lzo 20.5% 135 MB/s 410 MB/s no
snappy 22.2% 172 MB/s 409 MB/s no
  • Hadoop codec mode
Compressed format Corresponding encoding/decoder
DEFLATE org.apache.hadoop.io.compress.DefaultCodec
Gzip org.apache.hadoop.io.compress.GzipCodec
BZip2 org.apache.hadoop.io.compress.BZip2Codec
LZO com.hadoop.compress.lzo.LzopCodec
Snappy org.apache.hadoop.io.compress.SnappyCodec
  • Comparison of compression performance
Compression algorithm Original file size Compressed file size Compression speed Unpack the speed
gzip 8.3 GB 1.8 GB 17.5 MB/s 58MB/s
bzip2 8.3 GB 1.1 GB 2.4 MB/s 9.5 MB/s
LZO 8.3 GB 2.9 GB 49.3 MB/s 74.6 MB/s

google.github.io/snappy/

On a single core of a Core i7 processor in 64-bit mode, Snappy compresses at about 250 MB/sec or more and decompresses at about 500 MB/sec or more.

2. Compress the configuration parameters

  • To enable compression in Hadoop, configure the following parameters (in the mapred-site.xml file) :
parameter The default value phase advice
Io.com press.codecs (configured in core-site.xml) org.apache.hadoop.io.compress.DefaultCodec, org.apache.hadoop.io.compress.GzipCodec, org.apache.hadoop.io.compress.BZip2Codec, org.apache.hadoop.io.compress.Lz4Codec The input compressed Hadoop uses file extensions to determine whether a particular codec is supported
mapreduce.map.output.compress true Mapper output This parameter is set to true to enable compression
mapreduce.map.output.compress.codec org.apache.hadoop.io.compress.DefaultCodec Mapper output Use LZO, LZ4, or SNAPPY codec to compress data at this stage
mapreduce.output.fileoutputformat.compress true Reducer output This parameter is set to true to enable compression
mapreduce.output.fileoutputformat.compress.codec org.apache.hadoop.io.compress. DefaultCodec Reducer output Use standard tools or codecs such as gzip and bzip2
mapreduce.output.fileoutputformat.compress.type NONE|RECORD Reducer output SequenceFile output uses the following compression types: NONE and BLOCK

3. Enable Map output compression

  • Enabling map output phase compression reduces the amount of data transferred between Map and Reduce tasks in jobs. The configuration is as follows:

Case practice:

1HQL statements may be converted to multiple jobs, such as the result of JOB1 as the input of JoB2... Enable result data compression between jobs. The defaultfalse
hive (default)>set hive.exec.compress.intermediate=true;

2) Enable the map output compression function in MapReduce. The defaultfalse
hive (default)>set mapreduce.map.output.compress=true;

3) Sets the compression mode for map output data in MapReduce. Default DefaultCodec hive (default)>set mapreduce.map.output.compress.codec= org.apache.hadoop.io.compress.SnappyCodec;

4Execute query statement hive (default)>select count(1) from score;
Copy the code

4. Enable Reduce output compression

  • When Hive writes output to a table, the output can also be compressed. Attribute hive.exec.com press. The output control of the function. The user may want to keep the default value false in the default Settings file so that the default output is an uncompressed plain text file. You can enable output compression by setting this value to true in the query statement or execution script.

Case practice:

1) Enable the compression function for hive output data. The defaultfalse
hive (default)>set hive.exec.compress.output=true;

2) Enable mapReduce final output data compression. The defaultfalse
hive (default)>set mapreduce.output.fileoutputformat.compress=true;

3) Sets the compression mode for mapReduce final data output. Default DefaultCodec hive (default)> set mapreduce.output.fileoutputformat.compress.codec = org.apache.hadoop.io.compress.SnappyCodec;

4) Sets mapReduce final data output compression to block compression. Default RECORD Hive (default)>set mapreduce.output.fileoutputformat.compress.type=BLOCK;

5Test if the output is a compressed fileinsert overwrite local directory '/kkb/install/hivedatas/snappy' 
select * from score distribute by s_id sort by s_id desc;
Copy the code

2. Hive table file storage format

  • Hive supports the following data storage formats: TEXTFILE, SEQUENCEFILE, ORC, and PARQUET.

1. Column and row storage

  • The logical table is on the left, the first one on the right is row storage, and the second one is column storage.

  • The features of row storage are as follows: When a whole row of data meets the criteria is queried, column storage needs to find the value of each column in each clustered field. Row storage only needs to find one value, and the other values are located in adjacent places, so the query speed of row storage is faster.

  • The characteristics of column storage: because of the aggregated storage of data for each field, it can greatly reduce the amount of data read when only a few fields are needed for a query; The data type of each field must be the same, and column storage can be tailored to better design compression algorithms. Select certain fields are more efficient

  • TEXTFILE and SEQUENCEFILE are row-based storage formats;

  • ORC and PARQUET are based on column storage.

2, TEXTFILE format

  • Default format, data is not compressed, high disk overhead, high data parsing overhead. This mode can be used in combination with Gzip and Bzip2 (the system automatically checks and decompresses data during query). However, in Gzip mode, Hive does not divide data and therefore cannot perform parallel operations on data.

3. ORC format

  • Optimized Row Columnar (Orc) is a new storage format introduced in Hive 0.11.

  • You can see that each Orc file consists of one or more stripes, each stripe is 250MB in size. This stripe is actually equivalent to the RowGroup concept, but the size is 4MB->250MB, which can improve sequential read throughput. Each Stripe consists of three parts: Index Data,Row Data, and Stripe Footer:

  • An ORC file can be divided into several stripes

  • A stripe can be divided into three parts

    • IndexData: indexData for certain columns

    • RowData: The real data store

    • StripFooter: metadata information of the stripe

      1) Index Data: a lightweight Index. The default Index is every 1W rows. The index only records the offset of each field of a Row in Row Data.

    2) Row Data: Stores specific Data. Select some rows and then store them in columns. Each column is encoded into multiple streams for storage.

    3) Stripe Footer: Stores metadata information of each Stripe

  • Each File has a File Footer, which stores the number of rows in each Stripe, data type of each Column, etc.

  • At the end of each file is a PostScript, which records the compression type of the entire file and the length of the FileFooter. When reading a File, seek reads PostScript at the end of the File, parses the FileFooter length from the inside, then reads FileFooter, parses the information from the inside to each Stripe, and then reads each Stripe from the back to the front.

4. PARQUET format

  • Parquet is a columnar storage format for analytics. Developed by Twitter and Cloudera, Parquet graduated from Apache’s incubator as an Apache Top-level project in May 2015.

  • The Parquet file is stored as == binary == and therefore cannot be read directly. The file contains the file’s == data and metadata ==, so the Parquet format file is self-parsed.

  • Generally, the size of the row group is set according to the Block size when Parquet data is stored. In general, the minimum unit for data processing of each Mapper task is a Block, so that each row group can be processed by a Mapper task to increase the parallelism of task execution. The format of the Parquet file is shown below.

  • The figure above shows the contents of a Parquet file. Multiple row groups can be stored in a file. The first part of the file is the Magic Code of the file, which is used to verify whether it is a Parquet file. This value and the length of the file can be used to calculate the offset of the metadata. The metadata of the file includes the metadata information of each row group and the Schema information of the data stored in the file. In addition to metadata for each row group in a file, metadata for that page is stored at the beginning of each page. In Parquet, there are three types of pages: data pages, dictionary pages, and index pages. Data page is used to store the value of the column in the current row group, dictionary page is used to store the code dictionary of the column value, each column block contains a maximum of one dictionary page, index page is used to store the index of the column under the current row group, currently Parquet does not support index page.

5. Mainstream file storage format (TEXTFILE/ORC/PARQUET)

Construction method:

create table log_text (
...
)ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE/ORC/PARQUET ;
Copy the code

3. Combination of storage and compression

Liverpoolfc.tv: cwiki.apache.org/confluence/…

ORC storage mode compression:

Key Default Notes
orc.compress ZLIB high level compression (one of ==NONE, ZLIB, SNAPPY==)
orc.compress.size 262144 number of bytes in each compression chunk
orc.stripe.size 67108864 number of bytes in each stripe
orc.row.index.stride 10000 number of rows between index entries (must be >= 1000)
orc.create.index true whether to create row indexes
orc.bloom.filter.columns “” comma separated list of column names for which bloom filter should be created
orc.bloom.filter.fpp 0.05 False positive probability for Bloom filter (must >0.0 and <1.0)

1. Create a non-compressed ORC storage mode

(1)

create table log_orc_none(
...
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties ("orc.compress"="NONE");
Copy the code

2. Create an ORC storage mode for SNAPPY compression

(1)

create table log_orc_snappy(
...
)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
STORED AS orc tblproperties ("orc.compress"="SNAPPY");
Copy the code

2) Storage mode and compression summary:

In actual project development, the data storage format of Hive tables is usually orC or Parquet. The compression mode is snappy.

4. Hive enterprise-level tuning

1. Fetch

  • Fetch refers to that the MapReduce calculation is not required for certain queries in Hive

    • Such as:select * from score;
    • In this case, Hive could simply read the files in the employee storage directory and output the query results to the console
  • In the hive – default. XML. The template file = = hive. Fetch. Task. The default is more = = conversion, the old version hive default is minimal, this attribute is modified to more later, Mapreduce is not used in global search, field search, and limit search.

  • A case in field

    • The hive. The fetch. Task. The conversion is set = = none = = * * * *, then execute the query, will perform program
set hive.fetch.task.conversion=none;
Copy the code
  • The hive. The fetch. Task. The conversion is set = = none = = * * * *, then execute the query, will perform program
set hive.fetch.task.conversion=more;
Copy the code

2. Local mode

  • During Hive client testing, the Hadoop job mode is enabled by default and tasks are submitted to the cluster for execution, which causes slow computing.

  • Hive can process tasks on a single machine in local mode. For small data sets, the execution time can be significantly reduced.

  • A case in field

    Before starting local mode, run the statement

set hive.exec.mode.local.auto;
select * from score cluster by s_id;
Copy the code

Enable local Mode

-- Enable the local mode and execute the query statement; The default false
set hive.exec.mode.local.auto=true;  //Enable local Mr-- Set the maximum input value of local Mr. If the input value is smaller than this value, use local Mr.
-- The default value is 134217728, that is, 128 MB
set hive.exec.mode.local.auto.inputbytes.max=50000000;

-- Set the maximum number of input files to local Mr. If the number of input files is smaller than this value, use local Mr.
-- Default is 4
set hive.exec.mode.local.auto.input.files.max=5;

-- SQL statement to execute the query
select * from score cluster by s_id;

-- Disable local running mode
set hive.exec.mode.local.auto=false;
Copy the code

3. Optimization of tables

1 Join a small table or a large table
  • The tables with scattered keys and small amount of data are placed to the left of the join, which can effectively reduce the probability of memory overflow errors. Further, you can use Map Join to advance small dimension tables (under 1000 entries) into memory. Complete reduce on the Map side.

  • Test results show that the new Hive has optimized small tables to join large tables and large tables to join small tables. The small watch on the left and right has no obvious difference.

  • When multiple tables are associated, it is best to split them into small sections to avoid large SQL (no control over intermediate jobs).

2 Large table join large table
  • 1. Empty key filter

    • Sometimes join timeout is caused by too much data corresponding to some keys, but data corresponding to the same keys are sent to the same Reducer, resulting in insufficient memory.

    • In this case, we should carefully analyze the abnormal keys. In most cases, the data corresponding to these keys is abnormal data, which needs to be filtered in SQL statements.

    • Ex. :

INSERT OVERWRITE TABLE jointable
SELECT a.* FROM (SELECT * FROM nullidtable WHERE id IS NOT NULL ) a JOIN ori b ON a.id = b.id;
Copy the code

2. Empty key conversion

  • Sometimes there are a lot of data corresponding to an empty key, but the corresponding data is not abnormal data and must be included in the result of join. At this time, we can assign a random value to the field with an empty key in Table A, so that the data can be randomly and evenly divided into different reducer.

    Non-random distribution:

Ex. :

-- The default value is 256000000, that is 256m
set hive.exec.reducers.bytes.per.reducer=32123456;
set mapreduce.job.reduces=7;

INSERT OVERWRITE TABLE jointable
SELECT a.*
FROM nullidtable a
LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN 'hive' ELSE a.id END = b.id;
No rows affected (41.668 seconds)  
Copy the code

Result: The result is that all ids with null values become the same string, which easily leads to data skew (all keys are the same, and data with the same key will be sent to the same Reduce).

To solve this problem, we can use Hive’s RAND function to randomly assign a value to each empty ID so that the data does not skew

  • Random distribution:
set hive.exec.reducers.bytes.per.reducer=32123456;
  set mapreduce.job.reduces=7;
  
  INSERT OVERWRITE TABLE jointable
  SELECT a.*
  FROM nullidtable a
  LEFT JOIN ori b ON CASE WHEN a.id IS NULL THEN concat('hive', rand()) ELSE a.id END = b.id;
Copy the code
3 map join
  • If MapJoin is not specified or MapJoin conditions are not met, the Hive parser converts the Join operation to Common Join, that is, the Join is completed in the Reduce phase. Prone to data skew. MapJoin can be used to load all small tables into the memory and join on the Map side to avoid reducer processing.

  • 1. Enable the MapJoin parameter

-- Defaults to true
set hive.auto.convert.join = true;
Copy the code
  • 2. Threshold setting for large tables and small tables (default 25M is considered as small tables)
set hive.mapjoin.smalltable.filesize=26214400;
Copy the code
  • 3. Working mechanism of MapJoin

The first is Task A, which is A Local Task (A Task executed locally on the client) that scans the data of small table B, converts it into A HashTable data structure, and writes it to A Local file, which is then loaded into DistributeCache.

Next, Task B, which is a MR without Reduce, starts MapTasks to scan large table A. In the Map phase, each record of A is associated with the HashTable corresponding to table B in DistributeCache, and the result is directly output.

Because MapJoin does not have Reduce, Map directly outputs result files. There are as many result files as there are Map tasks.

4 group By
  • By default, the Map phase distributes the same Key data to each Reduce. If the Key data is too large, it is skewed.

  • Not all aggregation operations need to be performed on the Reduce end. Some aggregation operations can be performed on the Map end and the final result can be obtained on the Reduce end.

  • The Map aggregation parameter setting function is enabled

-- Whether to aggregate on the Map side. The default value is True
set hive.map.aggr = true;
-- The number of items aggregated on the Map side; The default is 100000
set hive.groupby.mapaggr.checkinterval = 100000;
Load balancing when data skew is present (default: False)
set hive.groupby.skewindata = true; The alternative is set totrue, the generated query plan will have two MR Jobs. In the first MR Job, Map output results are randomly distributed to Reduce. Each Reduce performs partial aggregation operations and outputs the results. In this way, the results are the sameGroup ByKeys may be distributed to different Reduce files to achieve load balancing. The second MR Job is followed according to the pre-processed data resultsGroup ByKeys are distributed to Reduce (this process guarantees the sameGroup ByKeys are distributed to the same Reduce), and the final aggregation operation is completed.Copy the code
5 count(distinct)
  • When the amount of data is small, it does not matter. When the amount of data is large, count distinct operations need to be performed by a Reduce Task. If the amount of data to be processed by a Reduce Task is too large, the whole Job cannot be completed. Generally, count distinct is replaced by group BY and count

4, use partition clipping, column clipping

  • Filter out as much data as possible as early as possible to avoid large amounts of data flowing into the outer SQL.
  • Column clipping
    • Get only the data for the required columns, reducing the data input.
    • Use less select *
  • Partitions cutting
    • A partition is essentially a directory in Hive. Partitioning can be tailored to directly filter out most data.
    • Use partition filtering whenever possible

Where:

SELECT a.id
FROM bigtable a
LEFT JOIN ori b ON a.id = b.id
WHERE b.id < = 10;
Copy the code

The correct way to write it is after ON: filter first, then correlate

SELECT a.id
FROM ori a
LEFT JOIN bigtable b ON (a.id < = 10 AND a.id = b.id);
Copy the code

Or write it as a subquery:

SELECT a.id
FROM bigtable a
RIGHT JOIN (SELECT id
FROM ori
WHERE id < = 10
) b ON a.id = b.id;
Copy the code

5. Parallel execution

  • Run phases in parallel that do not depend on each other in an SQL statement. Improves cluster resource utilization
-- Enable parallel execution; The default false
set hive.exec.parallel=true;
The maximum parallelism allowed for the same SQL is 8 by default.
set hive.exec.parallel.thread.number=16;
Copy the code

6. Strict mode

  • Hive provides a strict pattern that prevents users from executing queries that may have unintended adverse effects.

  • By setting the attribute hive.mapred.mode to the default non-strict mode nonstrict. To enable strict mode, change the value of hive.mapred.mode to strict. If strict mode is enabled, the three types of query are prohibited.

-- Set non-strict mode (default nonstrict)
set hive.mapred.mode=nonstrict;

-- Set strict mode
set hive.mapred.mode=strict;
Copy the code
  • (1) For partitioned tables, unless the WHERE statement contains partition field filtering conditions to limit the range, it is not allowed to execute

    Error setting SQL statement execution in strict mode This is ok in non-strict mode
    select * from score; Score is a partition tableError: Error while compiling statement: FAILED: SemanticException [Error]10041] :No partition predicate found for Alias "order_partition" Table "order_partition" 
    Copy the code
  • SQL > alter TABLE order by

    Error setting SQL statement execution in strict mode This is ok in non-strict mode
    select * from score where month='201806' order bys_score; Error: Error while compiling statement: FAILED: SemanticException1:61 In strict mode, if ORDER BY is specified, LIMIT must also be specified. Error encountered near token 'order_price'
    Copy the code
  • (3) Limit the query of cartesian product

    Avoid queries for Cartesian products in strict mode

7. JVM reuse

  • JVM reuse is a Hadoop tuning parameter that has a significant impact on Hive performance, especially for scenarios where it is difficult to avoid small files or where there are a large number of tasks, most of which have very short execution times.

    The default configuration of Hadoop is usually to use derived JVMS to perform Map and Reduce tasks. The JVM startup process can be quite expensive at this point, especially if the job executed contains hundreds or thousands of tasks. JVM reuse can cause JVM instances to be reused N times in the same job. The value of N can be configured in Hadoop’s mapred-site. XML file. Usually between 10 and 20, depending on business scenario testing.

<property>
  <name>mapreduce.job.jvm.numtasks</name>
  <value>10</value>
  <description>How many tasks to run per jvm. If set to -1, there is
  no limit. 
  </description>
</property>
Copy the code
  • We can also pass through Hive
set mapred.job.reuse.jvm.num.tasks=10;
Copy the code

This setting sets up our JVM reuse

The downside of this feature is that enabling JVM reuse will hold up the used Task slot for reuse until the task is complete. If several Reduce tasks in an unbalanced job take longer to execute than other Reduce tasks, the reserved slots will remain idle and cannot be used by other jobs until all tasks are finished.

8, Use EXPLAIN (execution plan)

  • View the HQL execution plan
explain select * from score where month='201806';
Copy the code

9, compressed

See data compression

  • Compression of data in Hive tables

    #Set totrueTo enable intermediate data compression, the default isfalse, not turned on
    set hive.exec.compress.intermediate=true;
    #Set the compression algorithm for intermediate data
    set mapred.map.output.compression.codec= org.apache.hadoop.io.compress.SnappyCodec;
    Copy the code
  • Compress the output of the Hive table

    set hive.exec.compress.output=true;
    set mapred.output.compression.codec= 
    org.apache.hadoop.io.compress.SnappyCodec;
    Copy the code

11. Data skew

1 Set a proper number of maps
    1. Typically, a job will generate one or more Map tasks from the input directory.
    The main determinants are: the total number of input files, the size of the input file, and the block size set by the cluster. For example: a) assuming that there is a file a in the input directory with a size of 780M, hadoop will divide the file a into seven blocks (six 128m blocks and one 12m block), resulting in seven maps. B) Assuming that there are three files a, B, and C in the input directory with sizes of 10m, 20m, and 150m respectively, Hadoop will divide them into four blocks (10m, 20m, 128m, and 22m) to generate four maps. That is, if the file is larger than the block size (128M), it is split, and if it is smaller, it is treated as a block.Copy the code
  • 2) Is the more maps the better?

    The answer is no. If a task has many small files (much smaller than the block size of 128M), each small file will be treated as a block and completed by a map task, which takes much longer to start and initialize than the logical processing time, resulting in a huge waste of resources. Furthermore, the number of maps that can be executed at the same time is limited.Copy the code
  • 3) Is it safe to make sure that each map handles close to 128MB of blocks?

    The answer is not necessarily. For example, a 127m file would normally be completed with a map, but the file has only one or two small fields and tens of millions of records. If the logic of map processing is complicated, it will definitely be time-consuming to complete with a map task. To solve the above problems 2 and 3, we need to adopt two ways: reduce the number of maps and increase the number of maps;Copy the code
2 Merge small files
  • Merge small files before map execution to reduce the number of maps:

  • CombineHiveInputFormat Combines small files (default format)

    set mapred.max.split.size=112345600;
    set mapred.min.split.size.per.node=112345600;
    set mapred.min.split.size.per.rack=112345600;
    set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
    Copy the code

    If the file block size is larger than 128 MB, separate the file block size by 128 MB. If the file block size is smaller than 128 MB, separate the file block size by 100 MB. If the file block size is smaller than 100 MB, separate the file block size by 100 MB.

3 Add the Map number for complex files
  • If the input files are large, the task logic is complex, and the map execution is slow, you can increase the number of maps to reduce the amount of data processed by each map and improve the task execution efficiency.

  • You can add a map as follows

    • According to ==computeSliteSize(math.max (minSize, math.min (maxSize,blocksize)))== formula
    • == Adjust maxSize the maximum value ==. You can increase the number of maps by keeping the maximum value of maxSize below blocksize.
    Graphs. Input. Fileinputformat. Split. Minsize = 1. The default value is 1 graphs input. Fileinputformat. Split. Maxsize = Long. The MAXValue MAXValue Therefore, by default, slice size =blocksize Maxsize: if the parameter is set to less than blocksize, the slice size becomes smaller and is equal to the configured value of this parameter. Minsize (minimum slice size): If the parameter value is larger than blockSize, the slice size can be larger than blockSize.Copy the code
    • For example,
    -- Set maxsize to 10M, i.e. FileSplit to 10M
    set mapreduce.input.fileinputformat.split.maxsize=10485760;
    Copy the code
4 Set a proper Reduce number
  • 1. Adjust the number of Reduce

    • 1) the default amount of data processed by each Reduce is 256MB

      set hive.exec.reducers.bytes.per.reducer=256000000;
      Copy the code
      1. Maximum number of Reduce tasks for each job. The default value is 1009
      set hive.exec.reducers.max=1009;
      Copy the code
      1. Calculate the reducer number formula
      N=min(parameter2, total amount of input data/parameter1)
      Copy the code
  • 2. Adjust the number of Reduce

    - Set the number of Reduce tasks in each job
    set mapreduce.job.reduces=3;
    Copy the code
  • 3. More reduce is not always better

  • Starting and initializing reduce too much also consumes time and resources.

  • At the same time, too many reduce files will be generated, and small files may also occur

conclusion

For more dry goods, please pay attention to my personal public account: BB Man’s big data journey, pay attention to receive welfare