Hadoop framework computing features

  1. Data volume is not a problem, data skew is a problem
  2. The operation efficiency of a job with a large number of jobs is relatively low. For example, even if there are hundreds of rows in a table, it will take a long time to generate more than a dozen Jobs if the table is repeatedly associated and summarized. The reason is that it takes a long time to initialize a Map Reduce job
  3. UDAF, such as sum, count, Max and min, are not afraid of data skew. Hadoop merges and optimizes the summary on the map side, which makes data skew not a problem
  4. Count (DISTINCT UserID) : In the case of a large amount of data, the efficiency is low. In the case of multiple counts (distinct UserID,month), the efficiency is even lower because count(DISTINCT) is grouped by the group by field. Generally, it is easy to cause skew by sorting by distinct fields. For example, PV data of Taobao is 3 billion in a day. If you divide it into groups by gender and allocate 2 Reduce, each reduce expects to process 1.5 billion data, but the reality must be that there are more boys and girls

Second, the optimization of common means

  1. A good model design works twice as well
  2. Solve the data skew problem
  3. Reduce the number of job
  4. Setting a proper number of MapReduce tasks improves performance. (For example, for 10W + level computation, 160 reduce is quite wasteful, 1 is enough)
  5. It’s a good idea to understand data distribution and solve data skew problems yourself. This is a general algorithm optimization, but sometimes algorithm optimization can not adapt to the specific business background, developers understand the business, understand the data, can accurately and effectively solve the problem of data skewness through business logic
  6. In the case of a large amount of data, use count(DISTINCT) with caution. Group BY may cause skew
  7. Combining small files is an effective method to improve the scheduling efficiency. If all the jobs are set with a reasonable number of files, the overall scheduling efficiency of the ladder will also have a positive impact
  8. When optimizing, grasp the whole, single operation is not as optimal as the whole

Third, sorting selection

Cluster by: sorts buckets for the same field. It cannot be used with sort by

Distribute by + sort by: buckets are distributed to ensure that the same field value exists in only one result file, and sort by is combined to ensure that each reduceTask result is orderly

Sort by: The results are sorted in a single reduce machine

“Order by” : global ordering. The defect is that only one Reduce can be used

“Be sure to distinguish between how these four sorts are used and where they apply!”

How to do the Cartesian product

When Hive is set to strict mode (hive.mapred.mode=strict), Cartesian product cannot appear in HQL statements. This indicates that Hive does not support Cartesian product. Because the Join key could not be found, only one Reducer was used to complete the Cartesian volume.

Of course, you can also use limit to reduce the amount of data that a table participates in a join, but MapJoin is the best solution for cartesian product semantics, which are often joined by a large table and a small table, but the result is still too large (so that it cannot be handled by a single machine). MapJoin, as the name implies, performs the Join operation on the Map side. This requires that one or more tables of the Join operation be fully read into memory.

PS: MapJoin may have unknown bugs in subqueries. Add a Join key to a Join to avoid cartesian product when large and small tables do cartesian product.

“The principle is simple: expand the join key of the small table by one column and duplicate the entries of the small table by several times with different join keys; expand the join key of the large table by one column into a random number.”

The essence is to replicate several times, and finally there are several reduce to do, and the data of the large table is randomly generated from the expansion of key value range of the small table in front, so the replication of several times N is equivalent to how big the random range is N, so correspondingly, the data of the large table is randomly divided into N. In addition, the amount of reduce used in the final processing is also N, and there is no data skew.

How do I write in/exists statements

Although hive1.2.1 also supports in/ EXISTS operations as tested, an efficient alternative to Hive is recommended: Left semi JOIN

Such as:

select a.id, a.name from a where a.id in (select b.id from b);
select a.id, a.name from a where exists (select id from b where a.id = b.id);
Copy the code

Should be converted to:

select a.id, a.name from a left semi join b on a.id = b.id;
Copy the code

Set a reasonable number of MapTasks

The number of Maps is too Large.

  • The Map phase output file is too small, resulting in a large number of small files
  • Initializing and creating a Map is expensive

“Map number is too Small”

  • The concurrency of file processing or query is small, and Job execution takes a long time
  • A large number of jobs may clog the cluster

In the MapReduce programming case, we learned that the number of MapTasks for a MR Job is determined by input shard InputSplit. Input sharding is defined byFileInputFormat.getSplit()A decision. An input shard corresponds to a MapTask, and the input shard is determined by three parameters:The input shard size is calculated as follows:

long splitSize = Math.max(minSize, Math.min(maxSize, blockSize))
Copy the code

By default, the input fragment size is the same as the default data block size of the HDFS cluster, that is, one data block is enabled by MapTask. This avoids data transfer between server nodes and improves job processing efficiency

There are two classic ways to control the number of MapTasks: reduce the number of MapTasks or increase the number of MapTasks

1. Reduce the number of MapTasks by merging small files, mainly for data sources.

2. Increase the number of MapTasks by controlling the number of reduceTask of the previous job

Because Hive statements are ultimately converted into a series of MapReduce jobs, and each MapReduce Job is composed of a series of MapTasks and reducetasks, by default, A MapTask or ReduceTask in MapReduce will start a JVM process, and the JVM process will exit after a Task is executed.

If the task takes a short time and the JVM has to be started multiple times, the JVM startup time can become a significant expense, which can be solved by reusing the JVM:

set mapred.job.reuse.jvm.num.tasks=5
Copy the code

7. Merge small files

A large number of files puts pressure on the HDFS and affects the processing efficiency. You can merge Map and Reduce result files to eliminate this impact:

set hive.merge.mapfiles = true# # inmapOnly merges small files at the end of the task set hive.merge.mapredfiles =false# #trueMerges small files at the end of MapReduce tasks. Set hive.merge.sie.per. task =256*1000*1000## set mapred.max.split. Size = mapred.max.split256000000; # # each Map biggest segmentation size set mapred. Min. The split. The size.. Per node =1; # # one node split is worth at least set hive.input.format=org.apache.hadoop.hive.ql.io.Com bineHiveInputFormat; ## Merge small files before executing MapCopy the code

Eight, set a reasonable number of reduceTask

In Hadoop MapReduce, the setting of the reducer number greatly affects the execution efficiency, which makes it a key problem for Hive to determine the reducer number. Unfortunately, the estimation mechanism of Hive is weak. If no reducer number is specified, Hive will guess and determine a reducer number based on the following two Settings:

“1, hive. The exec. Reducers. Bytes. Per the reducer (default is 256000000) 2, hive. The exec. Reducers. The Max (default is 1009) Mapreduce.job. reduces=-1 (set a constant reducetask number)”

The formula to calculate the reducer number is simple: N=min(parameter 2, total input data/parameter 1) In normal cases, it is necessary to manually specify the reducer number. Considering that the amount of output data in the MAP stage is usually significantly reduced than that of input, it is necessary to reset parameter 2 even if the reducer number is not set.

Based on Hadoop experience, parameter 2 can be set to 0.95*(number of Datanodes in the cluster).

9. Merge MapReduce operations

Multi-group by is a nice feature of Hive that makes it easy to leverage intermediate results in Hive. Such as:

FROM (SELECT a.status, b.school, b.gender 
FROM status_updates a JOIN profiles b ON (a.userid =
b.userid and a.ds='2009-03-20' ) ) subq1
INSERT OVERWRITE TABLE gender_summary PARTITION(ds='2009-03-20')
SELECT subq1.gender, COUNT(1) GROUP BY subq1.gender
INSERT OVERWRITE TABLE school_summary PARTITION(ds='2009-03-20')
SELECT subq1.school, COUNT(1) GROUP BY subq1.school
Copy the code

The preceding query statement uses the multi-group by feature to group by data for two consecutive times and uses different group by keys. This feature reduces one MapReduce operation.

X. Rational use of Bucketing and Sampling

Bucket hashes data to a specified number of buckets using the specified column as the key. This allows efficient sampling. In the following example, there are 32 buckets based on userID.

CREATE TABLE page_view(viewTime INT, userid BIGINT,
 page_url STRING, referrer_url STRING,
 ip STRING COMMENT 'IP Address of the User')
 COMMENT 'This is the page view table'
 PARTITIONED BY(dt STRING, country STRING)
 CLUSTERED BY(userid) SORTED BY(viewTime) INTO 32 BUCKETS
 ROW FORMAT DELIMITED
 FIELDS TERMINATED BY '1'
 COLLECTION ITEMS TERMINATED BY '2'
 MAP KEYS TERMINATED BY '3'
 STORED AS SEQUENCEFILE;
Copy the code

In general, Sampling is carried out on all data, so that the efficiency is naturally low and it needs to access all data. On the other hand, if a table already has buckets for a column, it can sample a bucket with a specified number among all buckets, reducing the number of visits.

The following example sampled all data from the third of 32 buckets in page_view:

SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 32);
Copy the code

The following example samples half of the third bucket in page_view out of 32 buckets:

SELECT * FROM page_view TABLESAMPLE(BUCKET 3 OUT OF 64);
Copy the code

Rational use of Partition

Partition is a Partition. Partitioning is implemented by enabling Partitioned BY when creating a table. The dimension used for partition is not a column of actual data, and the specific partition identifier is given when inserting content. To query the contents of a partition, use the WHERE statement, similar to where tablename.partition_column = a.

Create table with partition:

CREATE TABLE page_view(viewTime INT, userid BIGINT,
 page_url STRING, referrer_url STRING,
 ip STRING COMMENT 'IP Address of the User')
PARTITIONED BY(date STRING, country STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY '1'
STORED AS TEXTFILE;
Copy the code

Load the content and specify partition flags

load data local inpath '/home/hadoop/pv_2008-06-08_us.txt' into table page_view
partition(date='2008-06-08', country='US');
Copy the code

Query the partition content of a specified flag

SELECT page_views.* FROM page_views
WHERE page_views.date >= '2008-03-01' AND page_views.date <= '2008-03-31' AND
page_views.referrer_url like '%xyz.com';
Copy the code

12. Join Optimization

General principles:

2. If small tables Join large tables, it is better to start MAPJoin. 3.

As a rule when using queries that write Join operations, you should place tables/subqueries with fewer entries to the left of the Join operator.

The reason is that in the Reduce phase of Join operation, the contents of the tables on the left of the Join operator will be loaded into the memory. Placing the tables with fewer entries on the left can effectively Reduce the probability of OOM errors. In the case of multiple joins in a statement, if the Join conditions are the same, such as a query

INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x ON (u.userid = x.userid);
Copy the code

If the Join key is the same, no matter how many tables there are, they will be merged into a map-reduce task instead of “N”, and the same is true for OUTER joins.

If join conditions are different, for example:

INSERT OVERWRITE TABLE pv_users
SELECT pv.pageid, u.age FROM page_view p
JOIN user u ON (pv.userid = u.userid)
JOIN newuser x on (u.age = x.age);
Copy the code

The number of Map-reduce tasks corresponds to the number of Join operations. The preceding query is equivalent to the following query

INSERT OVERWRITE TABLE tmptable SELECT * FROM page_view p JOIN user u ON (pv. Userid = u.userid); INSERT OVERWRITE TABLE pv_users SELECT x. pagid, x.age FROM tmptable x JOIN newuser y ON (x.age = y.age);Copy the code

When writing the Join query statement, if it is determined that the data skew is caused by Join, please do the following Settings:

set hive.skewjoin.key=100000; If the number of entries corresponding to the key of join exceeds this value, it will be split. The value is set according to the specific data amount

set hive.optimize.skewjoin=true; // Set to true if the join process is skewed

Group By optimization

1. Partial aggregation on the Map side

Not all aggregation operations need to be performed on the Reduce end. Some aggregation operations can be performed on the Map end and the final result can be obtained on the Reduce end.

Parameters of the Combiner component of MapReduce include:

Set hive.map.aggr = true Specifies whether to perform aggregation on the map side. The default value is true

The set hive. Groupby. Mapaggr. Checkinterval = 100000 item number of the aggregation operations on the Map

2. Load balancing is performed when Group By has data skew

Set hive.groupby. skewinData = true

If groupby is used in SQL statements and data skew occurs, if this variable is set to true, Hive automatically performs load balancing. “The strategy is to split the MR tasks into two: the first one does the pre-summary and the second one does the final summary.”

In the first stage of MR, the output result set of Map will be cached in Maptaks, and each Reduce will perform partial aggregation operations and output the results. As a result, the same Group By Key may be distributed to different Reduce, so as to achieve load balancing. In the second stage, the preprocessed data results are distributed to Reduce according to Group By Key (this process can ensure that the same Group By Key is distributed to the same Reduce), and finally the final aggregation operation is completed.

14. Make proper use of file storage formats

When creating a table, use the orC and Parquet column storage formats. Data in each column of a table is physically stored together. Hive queries traverse only required column data, greatly reducing the amount of data to be processed.

Execute MapReduce in local mode

Hive queries run on N servers in a cluster by default. Multiple servers are required to coordinate Hive queries. However, if the amount of data to be processed by Hive query is small, it is unnecessary to enable distributed query because distributed query involves cross-network transmission, multi-node coordination, and resource consumption. At this time, mapReduce jobs can be executed in local mode only on one machine, which is fast. There are three parameters involved in starting local mode:

Set hive. The exec. Mode. Local. Auto = true is open the hive automatically determine whether to start a local mode switch, but is only open this parameter does not ensure that start a local model, want to be a number less than the map task

Hive, exec mode. Local. Auto.. Input files. The Max number and map input file size is less than

Hive, exec mode. Local. Auto. Inputbytes. Max specified size, to start a local mode.

Parallelization

A Hive SQL statement may be converted to multiple MapReduce jobs. Each Job is a stage. These jobs are executed in sequence. However, sometimes these tasks are not interdependent. If cluster resources allow, multiple non-interdependent stages can be executed concurrently, which saves time and improves execution speed. However, if cluster resources are insufficient, On the contrary, enabling parallelism causes each job to seize resources from each other, which degrades the overall execution performance. Enable parallelization:

“Set hive. The exec. The parallel = true; set hive. The exec. Parallel. The thread. The number = 8; / / the same SQL allows the maximum number of threads in parallel tasks”

Set up compressed storage

1. Reasons for compression

Hive is converted to MapReduce. The performance bottleneck of MapReduce is network I/O and disk I/O. To solve the performance bottleneck, reduce the amount of data.

Although compression reduces the amount of data, the compression process consumes CPU. However, in Hadoop, the performance bottleneck is not usually the CPU. The CPU pressure is not large, so compression makes full use of relatively idle CPU.

2. Comparison of common compression methods

Class for each compression mode:

3. Selection of compression mode

The compression ratio

Compression Speed of decompression

Whether Split is supported

4, compression use

“Job output files are compressed by block in GZip mode:”

set mapreduce.output.fileoutputformat.compress=true // The default is false

set mapreduce.output.fileoutputformat.compress.type=BLOCK // The default value is Record

set mapreduce.output.fileoutputformat.compress.codec=org.apache.hadoop.io.compress.GzipCodec / / the default value is org.apache.hadoop.io.com press. DefaultCodec
Copy the code

“Map output is also compressed with Gzip:”

set mapred.map.output.compress=true

set mapreduce.map.output.compress.codec=org.apache.hadoop.io.compress.GzipCodec / / the default value is org.apache.hadoop.io.com press. DefaultCodec
Copy the code

“Compress both the Hive output and the middle:”

set hive.exec.compress.output=true // Default is false, no compression

set hive.exec.compress.intermediate=true // The default value is false. MR compression is enabled only when it is true
Copy the code