The join of common

Before we start, we will talk about the definition of a join, and then we will talk about different joins. Sometimes we need to get information about two tables or three tables or more tables at the same time. We need to associate different tables and get data

Generally speaking, Hive joins can be divided into Common Join (Reduce complete Join), Map Join (Map complete Join), and Sort Merge Bucket Join. This division is different in terms of functions

There are different types of join, for example, oin, LEFT | RIGTH | FULL OUTER join, LEFT SEMI join, mainly reflects in the functions are not the same

Map-side Join

The main idea of map Join is that when the two associated tables are a small table and a very large table, the smaller table is directly put into memory, and then the larger table is compared for map operation. Join takes place during the map operation. Every time a row of data in the larger table is scanned, You have to look at the data in the small table, which one matches it, and then join it.

This type of join does not involve reduce operations and therefore does not involve shuffle, reducing the high cost and delay caused by data transmission over the network. Because a join is performed on the Map end, it is also called a Map Join

A local task that first wraps our data into a HashTable(key,value), then outputs it to a file, which is then loaded by DistributedCache. Since DistributedCache itself is stored using the data structure of HashTable, the algorithm complexity of a real join and the size of a large table can be used to remotely pull data from a Mapper that has only other large tables

Not all scenarios are suitable for MapJoin. It is usually used in the following scenarios:

  1. Of the two tables to join, one is large and one is small, and this small table can be stored in memory without affecting performance. This allows us to copy the small table files locally to each Map task and have Map read the files into memory for later use.

  2. It is used to solve data cleaning. For example, our large table has data skew. In this case, some Reduce processes will have too much data, because MapJoin occurs at the Map end, and Reduce is directly eliminated

  3. If unequal conditions are written in WHERE (Hive does not support unequal join, you can only unload conditions in WHERE), Then MapReduce will perform Cartesian product, which is extremely inefficient. Then filter out the required data in where

Map Join can be set with two parameters, the first parameter is to enable Map Join and the second parameter is to limit the size of small tables

set hive.auto.convert.join=true; / / set MapJoin optimization automatically open, set hive. The default is open MapJoin. Smalltable. The filesize = 25000000 / / set a small table is less than the big open MapJoin optimization, namely the 25 mCopy the code

Before Hive0.7, you need to hint /*+ mapJoin (table) */ to execute mapJoin; otherwise, you need to perform Common Join. After 0.7, MapJoin is automatically converted by default. The value is controlled by the hive.auto.convert.join parameter. The default value is true

First let’s create a slightly larger table, about 500M

create table ods_user_log(
     id int,
     name string,
     city string,
     phone string,
     acctime string)
row format delimited fields terminated by '\t'
stored as textfile;
LOAD DATA LOCAL INPATH '/Users/liuwenqiang/access.log' OVERWRITE INTO TABLE ods_user_log;
Copy the code

Then we create a small table that extracts 100 entries from the table above

create table ods_user_log_small as select * from ods_user_log limit 100;
Copy the code

Next, let’s create a join statement to query

select
    a.*
from
    ods_user_log a
inner join
    ods_user_log_small b
on
    a.id=b.id
limit 10
;
Copy the code

We saw that the query took 28.614s. I intercepted part of the processing log

2021-01-02 14:40:50,409 INFO [66C753b7-56d4-429C-bb87-14eed166a61A main-localtask-mapredlocal-stdout] Mr. MapredLocalTask (logredirector.java :run(65)) -2021-01-02 14:40:50,409 INFO [main] exec.filterOperator (operator.java :initialize(344)) -initializing operator FIL[13] 2021-01-02 14:40:50,639 INFO [main] exec.SelectOperator (operator.java: Initialize (344)) -initializing Operator SEL[5] 2021-01-02 14:40:50,642 INFO [66C753b7-56d4-429C-bb87-14EED166A61A MapredLocalTask (Logredirector.java :run(65)) -2021-01-02 14:40:50,642 INFO [main]  exec.SelectOperator (SelectOperator.java:initializeOp(73)) - SELECT Struct < id: int, name: string, city: string, phone: string, acctime: string > 2021-01-02 14:40:50, 642 INFO [main] exec.HashTableSinkOperator (Operator.java:initialize(344)) - Initializing operator HASHTABLESINK[15] 2021-01-02 14:40:50,642 INFO [66C753b7-56d4-429C-bb87-14eed166a61A main-localtask-mapredlocal-stdout] Mr. MapredLocalTask [66C753b7-56d4-429C-bb87-14eed166a61A main-mapredlocal-stdout (LogRedirector. Java: run (65) - the 2021-01-02 14:40:50, 642 INFO. [the main] mapjoin MapJoinMemoryExhaustionHandler (MapJoinMemoryExhaustionHandler.java:<init>(61)) - JVM Max Heap Size: 239075328 the 2021-01-02 14:40:50, 653 INFO [main] persistence. HashMapWrapper (HashMapWrapper. Java: calculateTableSize (97)) Key count from statistics is -1; Setting the map size to 100000 2021-01-02 14:40:50, 675 INFO [main] Configuration. The deprecation (Configuration.java:logDeprecation(1395)) - No unit for dfs.client.datanode-restart.timeout(30) assuming SECONDS 2021-01-02 14:40:50,675 INFO [66C753b7-56d4-429C-bb87-14eed166a61A main-localtask-mapredlocal-stdout] Mr. MapredLocalTask (Logredirector.java :run(65)) -2021-01-02 14:40:50,654 INFO [main] Mr. MapredLocalTask (MapredLocalTask.java:initializeOperators(516)) - fetchoperator for$hdt$_1: the initialized 14:40:51 2021-01-02 b, 258 INFO [main] Configuration. The deprecation (Configuration. Java: logDeprecation (1395)) - mapred.input.dir is deprecated. Instead, Use graphs. Input. Fileinputformat. Inputdir 14:40:51 2021-01-02, 321 INFO [c753b7 66-56 d4-429 - c - bb87-14 eed166a61a MapredLocalTask (Logredirector.java: Run (65)) -2021-01-02 14:40:51:320 INFO [main]  mapred.FileInputFormat (FileInputFormat.java:listStatus(259)) - Total input files to process : 1 2021-01-02 14:40:51, 352 INFO. [the main] sasl SaslDataTransferClient (SaslDataTransferClient. Java: checkTrustAndSend (239))  SASL encryption trust check: localHostTrusted = false, RemoteHostTrusted = false 2021-01-02 14:40:51,423 INFO [66C753b7-56d4-429C-bb87-14EED166a61A MapredLocalTask (Logredirector.java :run(65)) -2021-01-02 14:40:51:423 INFO [main]  exec.TableScanOperator (Operator.java:logStats(1038)) - RECORDS_OUT_OPERATOR_TS_3:100, RECORDS_OUT_INTERMEDIATE:0, 2021-01-02 14:40:51:423 INFO [main] exec.SelectOperator (operator.java :logStats(1038)) - RECORDS_OUT_OPERATOR_SEL_5:100, RECORDS_OUT_INTERMEDIATE:0, 2021-01-02 14:40:51:423 INFO [66C753b7-56d4-429C-bb87-14eed166a61A main-localtask-mapredlocal-stdout] Mr. MapredLocalTask [66C753b7-56d4-429C-bb87-14eed166a61A main-mapredlocal-stdout (logredirector.java :run(65)) -2021-01-02 14:40:51,423 INFO [main] exec.filterOperator (operator.java :logStats(1038)) -  RECORDS_OUT_INTERMEDIATE:0, RECORDS_OUT_OPERATOR_FIL_"0, The 2021-01-02 14:40:51, 423 INFO [main] exec. HashTableSinkOperator (HashTableSinkOperator. Java: flushToFile (293)) - Temp URI for side table: file:/tmp/hive/local/66c753b7-56d4-429c-bb87-14eed166a61a/hive_2021-01-02_14-40-36_550_872109057128790183-2/-local-10004/HashTable-Stage-3
2021-01-02 14:40:51	Dump the side-table for tag: 1 with group count: 9 into file: file:/tmp/hive/local/66c753b7-56d4-429c-bb87-14eed166a61a/hive_2021-01-02_14-40-36_550_872109057128790183-2 - local - 10004 / HashTable - Stage - 3 / MapJoin mapfile01 -. - HashTable 14:40:51 2021-01-02, 423 INFO [66c753b7-56d4-429c-bb87-14eed166a61a main-LocalTask-MAPREDLOCAL-stdout] mr.MapredLocalTask (LogRedirector.java:run(65)) - the 2021-01-02 14:40:51, 423 INFO [main] exec. HashTableSinkOperator (SessionState. Java: printInfo (1227)) - 2021-01-02 14:40:51 Dump the side-table for tag: 1 with group count: 9 into file: file:/tmp/hive/local/66c753b7-56d4-429c-bb87-14eed166a61a/hive_2021-01-02_14-40-36_550_872109057128790183-2/-local-10004/HashTable-Stage-3/MapJoin-mapfile01--.hashtable
2021-01-02 14:40:51	Uploaded 1 File to: file:/tmp/hive/local/66c753b7-56d4-429c-bb87-14eed166a61a/hive_2021-01-02_14-40-36_550_872109057128790183-2/-local-10004/ hashtable-stage-3 / mapjoin-mapFile01 -- HashTable (789 bytes) 2021-01-02 14:40:51,456 INFO [main] exec.HashTableSinkOperator (SessionState.java:printInfo(1227)) - 2021-01-02 14:40:51 Uploaded 1 File to: file:/tmp/hive/local/66c753b7-56d4-429c-bb87-14eed166a61a/hive_2021-01-02_14-40-36_550_872109057128790183-2/-local-10004/HashTable-Stage-3/MapJoin-mapfile01--.hashtable (789 bytes)
Copy the code

There are several important pieces of information in the above processing log that can help us understand the map Join execution

  1. mr.MapredLocalTask Initializing operator
  2. Initializing operator HASHTABLESINK
  3. Dump the side-table for tag: 1 with group count: 9 into file: file:/tmp/hive/local/66c753b7-56d4-429c-bb87-14eed166a61a/hive_2021-01-02_14-40-36_550_872109057128790183-2/-local-10004 /HashTable-Stage-3/MapJoin-mapfile01–.hashtable
  4. Uploaded 1 File to: file:/tmp/hive/local/66c753b7-56d4-429c-bb87-14eed166a61a/hive_2021-01-02_14-40-36_550_872109057128790183-2/-local-10004 /HashTable-Stage-3/MapJoin-mapfile01–.hashtable (789 bytes)

From the above execution log, we can see that the implementation of real Map Join is actually used, and map Join is automatically enabled. Now we turn off map Join and execute again

1 Sujianbo weinan 14730379051 2020-03-29 21:12:15 1 Sujianbo Weinan 14730379051 2020-03-29 21:12:15 1 Sujianbo Weinan 14730379051 2020-03-29 220-03-29 21:12:151 220-03-29 21:12:151 220-03-29 21:12:151 220-03-29 21:12:151 220-03-29 21:12:151 220-03-29 21:12:151 220-03-29 21:12:151 220-03-29 21:12:151 220-03-29 21:12:151 220-03-29 21:12:151 220-03-29 21:12:151 2020-03-29 21:12:15 1 Sujian Bainan 14730379051 2020-03-29 21:12:15 1 Sujian Bainan 14730379051 2020-03-29 21:12:15 1 Sujian Bainan 14730379051 2020-03-29 21:12:15 1 Sujian Bainan 14730379051 2020-03-29 21:12:15 14730379051 2020-03-29 21:12:15 2021-01-02 14:49:06,853 INFO [66c753b7-56d4-429c-bb87-14eed166a61a main] exec.ListSinkOperator (Operator.java:logStats(1038)) - RECORDS_OUT_INTERMEDIATE:0, records_out_operator_list_sink_14:10, Time taken: 43.117 seconds, writting :10 row(s)Copy the code

Since map Join is disabled this time, you can’t see the log output related to Map Join, and the execution time is much longer. In fact, the larger the data volume, the more obvious the effect of Map Join

Reduce-side(Common) Join

Reduce Join is used by default in Hive Join operations. Reduce Side Join is the simplest join mode and its main ideas are as follows:

In the map stage, the map function simultaneously reads two files File1 and File2. In order to distinguish key/value data pairs from two sources, a tag is assigned to each data pair. Then shuffle operation is performed to ensure that data with the same key falls into a Reducer. Then complete the corresponding join logic in the Reducer, tag the data of different tables in the output value of map, judge the data source according to the tags in the reduce stage, and then read the required data according to the ORDER of SQL select and return

SELECT a.id,a.dept,b.age
FROM a join b
ON (a.id = b.id);
Copy the code

SMB Join (sort Merge bucket)

A bucket is a file in a table directory. A bucket is a file in a table directory. A bucket is a file in a table directory, a file in a table directory, and a bucket is a file in a table directory. At this time, we do not need to scan the entire table data, only need to scan the data in the corresponding bucket (because the same key must be in the same bucket), SMB design is to solve the join between large tables and large tables, the core idea is to convert large tables into small tables. Then map side join solution is a typical idea of divide and conquer.

There’s a little note, data on the bucket not only associated with the value of the key, is also related to the number of barrels, because we are according to the key’s hash value and the number of barrels modulo get a value, then according to the value of the data in the corresponding bucket, so to come out and it is the number of barrels is relevant, Therefore, in general, we require not only that the bucket fields of the two bucket tables are equal, but also that the number of buckets is a multiple relationship (equality is also ok).

set hive.enforce.bucketing=true;
set hive.enforce.sorting=true;
Copy the code

Table optimization data goal: Try to keep the same data together

So let’s take the data from the table in the example above and create two new tables

create table ods_user_bucket_log(
     id int,
     name string,
     city string,
     phone string,
     acctime string)
CLUSTERED BY (`id` ) INTO 5 BUCKETS 
row format delimited fields terminated by '\t'
stored as textfile;

create table ods_user_bucket_2_log(
     id int,
     name string,
     city string,
     phone string,
     acctime string)
CLUSTERED BY (`id` ) INTO 5 BUCKETS    
row format delimited fields terminated by '\t'
stored as textfile;
insert overwrite table ods_user_bucket_log select * from ods_user_log;
insert overwrite table ods_user_bucket_2_log select * from ods_user_log;
Copy the code

Since we split 5 buckets, there should be 5 files on HDFS

Let’s execute the SQL

set hive.auto.convert.sortmerge.join=true; set hive.optimize.bucketmapjoin=true; set hive.optimize.bucketmapjoin.sortedmerge=true; select a.id,b.name from ods_user_bucket_log a inner join ods_user_bucket_2_log b on a.id=b.id limit 10; -- Non-bucket table reset; select a.id,b.name from ods_user_log a inner join ods_user_log_2 b on a.id=b.id limit 10;Copy the code

Below is the bucket table execution

The following is the execution of a non-bucket table

We found that although bucket table execution is faster, the difference is not much, this is because our table is not very large, the larger the table, the more obvious the effect

conclusion

  1. Map Join is particularly useful for performance optimization and has many applications, such as size table association, unequal join, and data skew

  2. SMB joins are mainly used to deal with large table association. Hive does not check whether the two join tables have completed buckets and sorted. Users need to ensure the join table by themselves; otherwise, data may be incorrect

  3. Reduce-side(Common) Join is the most Common Join type. Shuffle is used to distribute data