Personal blog post
Conditions of use
- Hive V1 bucketing table Hive V1 bucketing table
Other data source connectors that support buckets need to implement presto-specific methods @David: Assuming it’s hashing as in Hive, and two tables bucketed the same way are compatible, then that could in theory be implemented in the Kudu connector. The connector needs to expose the bucketing and splits to the engine in a specific way.
The principle of
Presto’s Grouped Execution.
Two tables (Orders, orders_item) with the same number of buckets and bucketing based on the same field (OrderID). When joining through orderID, both tables with the same OrderID are placed in buckets with the same ID. Therefore, join and aggregate calculations can be carried out independently (refer to partition process of MapReduer).
Memory usage is limited by controlling the number of parallel processing buckets.
Theoretical memory usage: Optimized memory usage = original memory usage/number of buckets in the table * number of parallel processing buckets
The test environment
- Ubuntu 14.04
- PrestoSQL-317
- Hive connector (Hive 3.1)
- TPCH connector
Test steps
Use Hive as default data source connection (without writing Hive prefix)
Table 1 built
Copy data to Hive
create table orders as select * from tpch.sf1.orders;
-- drop table test_grouped_join1;
CREATE TABLE test_grouped_join1
WITH (bucket_count = 13, bucketed_by = ARRAY['key1']) as
SELECT orderkey key1, comment value1 FROM orders;
-- drop table test_grouped_join2;
CREATE TABLE test_grouped_join2
WITH (bucket_count = 13, bucketed_by = ARRAY['key2']) as
SELECT orderkey key2, comment value2 FROM orders;
-- drop table test_grouped_join3;
CREATE TABLE test_grouped_join3
WITH (bucket_count = 13, bucketed_by = ARRAY['key3']) as
SELECT orderkey key3, comment value3 FROM orders;
Copy the code
Grouped Execution is not used in the 2 tests
- the default
set session colocated_join=false;
set session grouped_execution=false;
-- View the execution plan
-- explain analyze
explain (TYPE DISTRIBUTED)
SELECT key1, value1, key2, value2, key3, value3
FROM test_grouped_join1
JOIN test_grouped_join2
ON key1 = key2
JOIN test_grouped_join3
ON key2 = key3
Copy the code
Execution plan results (too long to be ignored)
Fragment 0 [SINGLE] Output layout: [key1, value1, key1, value2, key1, value3] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION Output[key1, value1, key2, value2, key3, value3] [KEY1: bigInt, value1: vARCHar (79), KEY1 :bigint, value2: vARCHar (79), KEY1 :bigint, value3:varchar(79)] 1500000 (268.28MB), CPU: 1.85G, Memory: 204.60MB, Network: The 447.13 MB} │ key2: key1 = │ key3: = key1 └ ─ RemoteSource [1] Layout: [key1:bigint, value1:varchar(79), value2:varchar(79), value3:varchar(79)] Fragment 1 [hive:buckets=13, hiveTypes=[bigint]] Output layout: [key1, value1, value2, value3] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION InnerJoin[("key1" = "key3"] [$hashvalue.$hashvalue_34] │ Layout: [KEY1: bigInt, Value1: VARCHAR (79), Value2: VARCHar (79), Value3: VARCHar (79)] │ Estimates: {rows: │ ├─ Partial Exercises - InnerJoin[(242.53MB), CPU: 2.85 G, Memory: 2.70 MB, Network: 2.70 MB} │ ├─ Partial Exercises - InnerJoin[(242.53MB), CPU: 2.70 G, Memory: 2.70 MB, Network: 2.70 MB}"key1" = "key2"] [$hashvalue.$hashvalue_31]
│ │ Layout: [key1:bigint, value1:varchar(79), $hashvalue: bigINT, value2: VARCHar (79)] │ │ Estimates: {rows: 1500000 (178.85MB), CPU: 160.64 MB, Memory: 160.64 MB, network: 102.30MB} │ ├─ ScanProject[table = hive:test:test_grouped_join1 bucket=13, grouped = false]
│ │ Layout: [key1:bigint, value1:varchar(79), $hashvalue: bigint] │ │ Estimates: {rows: 1500000 (102.30 MB), CPU: 89.43 M, the memory: 0 b, network: 0 b} / {rows: 1500000 (102.30MB), CPU: 191.73 MB, Memory: 0B, network: 0B │ │$hashvalue: ="combine_hash"(bigint '0', COALESCE("$operator$hash_code"("key1"└ (0, 2)), 0), 0), 0), 0), 0 (0, 0), 0 (0, 0), 0 (0), 0 (0), 0 (0), 0 (0), 0 (0), 0 (0), 0 (0), 0 (0), 0 (0), 0 (0)$hashvalue_31] ("key2"│ │ Layout: [key2:bigint, value2: vARCHar (79),$hashvalue_31: bigint] │ │ Estimates: {rows: 1500000 (102.30 MB), CPU: 396.33 M, the memory: 0 b, the network: 102.30 MB} │ └ ─ RemoteSource [2] │ Layout: [key2: bigint, value2: varchar (79).$hashvalue_32: bigint] └ ─ LocalExchange [HASH] [$hashvalue_34] ("key3"│ Layout: [key3:bigint, value3: vARCHar (79),$hashvalue_34:bigint] │ rows: 1500000 (102.30MB), MEMORY: 0B, network: 0B ├ ─ ├ ─ sci-tech [3] ├ ─ sci-tech [8] ├ ─ sci-tech [8]$hashvalue_35:bigint]
Fragment 2 [hive:buckets=13, hiveTypes=[bigint]]
Output layout: [key2, value2, $hashvalue_33]
Output partitioning: hive:buckets=13, hiveTypes=[bigint] [key2]
Stage Execution Strategy: UNGROUPED_EXECUTION
ScanProject[table = hive:test:test_grouped_join2 bucket=13, grouped = false]
Layout: [key2:bigint, value2:varchar(79), $hashvalue_33:bigint]
Estimates: {rows: 1500000 (102.30MB), cpu: 89.43M, memory: 0B, network: 0B}/{rows: 1500000 (102.30MB), cpu: 191.73M, memory: 0B, network: 0B}
$hashvalue_33: ="combine_hash"(bigint '0', COALESCE("$operator$hash_code"("key2"), 0))
key2 := key2:bigint:0:REGULAR
value2 := value2:varchar(79):1:REGULAR
Fragment 3 [hive:buckets=13, hiveTypes=[bigint]]
Output layout: [key3, value3, $hashvalue_36]
Output partitioning: hive:buckets=13, hiveTypes=[bigint] [key3]
Stage Execution Strategy: UNGROUPED_EXECUTION
ScanProject[table = hive:test:test_grouped_join3 bucket=13, grouped = false]
Layout: [key3:bigint, value3:varchar(79), $hashvalue_36:bigint]
Estimates: {rows: 1500000 (102.30MB), cpu: 89.43M, memory: 0B, network: 0B}/{rows: 1500000 (102.30MB), cpu: 191.73M, memory: 0B, network: 0B}
$hashvalue_36: ="combine_hash"(bigint '0', COALESCE("$operator$hash_code"("key3"), 0))
key3 := key3:bigint:0:REGULAR
value3 := value3:varchar(79):1:REGULAR
Copy the code
Grouped Execution was used in the test
set session colocated_join=true;
set session grouped_execution=true;
-- Number of parallel processing buckets: 0 indicates that all buckets are processed at once
set session concurrent_lifespans_per_task=1;
This property is set to default and its role is not explained here
set session dynamic_schedule_for_grouped_execution=false;
-- View the execution plan
-- explain (TYPE DISTRIBUTED)
explain analyze
SELECT key1, value1, key2, value2, key3, value3
FROM test_grouped_join1
JOIN test_grouped_join2
ON key1 = key2
JOIN test_grouped_join3
ON key2 = key3
Copy the code
Execution plan results (too long to be ignored)
Fragment 0 [SINGLE] Output layout: [key1, value1, key1, value2, key1, value3] Output partitioning: SINGLE [] Stage Execution Strategy: UNGROUPED_EXECUTION Output[key1, value1, key2, value2, key3, value3] [KEY1: bigInt, value1: vARCHar (79), KEY1 :bigint, value2: vARCHar (79), KEY1 :bigint, value3:varchar(79)] 1500000 (268.28MB), CPU: 1.65G, Memory: 204.60MB, Network: The 242.53 MB} │ key2: key1 = │ key3: = key1 └ ─ RemoteSource [1] Layout: [key1:bigint, value1:varchar(79), value2:varchar(79), value3:varchar(79)] Fragment 1 [hive:buckets=13, hiveTypes=[bigint]] Output layout: [key1, value1, value2, value3] Output partitioning: SINGLE [] Stage Execution Strategy: FIXED_LIFESPAN_SCHEDULE_GROUPED_EXECUTION InnerJoin[("key1" = "key3"] [$hashvalue.$hashvalue_33] │ Layout: [KEY1: bigInt, Value1: VARCHAR (79), Value2: VARCHar (79), Value3: VARCHar (79)] │ Estimates: {rows: 1500000 (242.53MB), CPU: 1.65G, Memory: 204.60MB, Network: 0B │ ├─ PARTITIONED ├─ InnerJoin[(242.53MB)"key1" = "key2"] [$hashvalue.$hashvalue_31]
│ │ Layout: [key1:bigint, value1:varchar(79), $hashvalue│ ├ ─ ch0folders: 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0. 0B} │ │ ├─ ScanProject[table = hive:test:test_grouped_join1 bucket=13, grouped = true]
│ │ Layout: [key1:bigint, value1:varchar(79), $hashvalue: bigint] │ │ Estimates: {rows: 1500000 (102.30 MB), CPU: 89.43 M, the memory: 0 b, network: 0 b} / {rows: 1500000 (102.30MB), CPU: 191.73 MB, Memory: 0B, network: 0B │ │$hashvalue: ="combine_hash"(bigint '0', COALESCE("$operator$hash_code"("key1"└ (0, 2)), 0), 0), 0), 0), 0 (0, 0), 0 (0, 0), 0 (0), 0 (0), 0 (0), 0 (0), 0 (0), 0 (0), 0 (0), 0 (0), 0 (0), 0 (0)$hashvalue_31] ("key2"│ │ Layout: [key2:bigint, value2: vARCHar (79),$hashvalue_31: bigint] │ │ Estimates: {rows: 1500000 (102.30 MB), CPU: 294.03 M, the memory: 0 b, network: 0} b │ └ ─ ScanProject [table = hive:test:test_grouped_join2 bucket=13, grouped = true]
│ Layout: [key2:bigint, value2:varchar(79), $hashvalue_32[bigint] │ ├ ─ Estimates: {rows: 1500000 (102.30MB), CPU: 1.05 M, Memory: 0B, network: 0B}/{rows: 1500000 (102.30MB), CPU: 1.05 M, memory: 0B, network: 0B} 191.75m, Memory: 0B, network: 0B} │$hashvalue_32: ="combine_hash"(bigint '0', COALESCE("$operator$hash_code"("key2":), 0)) │ key2 = key2: bigint: 0: REGULAR │ value2: = value2: varchar (79) : 1: REGULAR └ ─ LocalExchange [HASH] [$hashvalue_33] ("key3"│ Layout: [key3:bigint, value3: vARCHar (79),$hashvalue_33: bigint] │ Estimates: {rows: 1500000 (102.30 MB), CPU: 294.03 M, the memory: 0 b, network: 0 b} └ ─ ScanProject [table = hive:test:test_grouped_join3 bucket=13, grouped = true]
Layout: [key3:bigint, value3:varchar(79), $hashvalue_34:bigint]
Estimates: {rows: 1500000 (102.30MB), cpu: 89.43M, memory: 0B, network: 0B}/{rows: 1500000 (102.30MB), cpu: 191.73M, memory: 0B, network: 0B}
$hashvalue_34: ="combine_hash"(bigint '0', COALESCE("$operator$hash_code"("key3"), 0))
key3 := key3:bigint:0:REGULAR
value3 := value3:varchar(79):1:REGULAR
Copy the code
Analysis of the
The number of buckets in the table is 13 (set to t). After a table reads memory, it is 102MB. Therefore, the memory occupied by a bucket =102MB/13=7.8MB (set to m).
Test Presto for single machine, -xmx =1GB, single query maximum footprint (query.max-memory-per-node) 102MB (set to A, default 0.1*Max JVM size).
Maximum number of parallel processing buckets (set to n)
SQL > join table (s)
Concurrent_lifespan To eliminate the OOM, set it to a value smaller than 4.4
Test lifespan: When concurrent_lifespan is set to 5
SQL Error [131079]: Query failed (# 20190821_054413_00220_r4Jkt): Query Exceeded per-node user memory limit of 102.40MB [Allocated: 102.38MB, Delta: 59.11 kB, Top Consumers: {HashBuilderOperator = 102.38 MB}]
Copy the code
Note: these are theoretical values and are for reference only. (Affected by factors such as “it is impossible to divide buckets evenly”)
Usage scenarios
- Assume the maximum memory for a single Query is 1GB
- Assume that all tables that participate in the join have a size of 10GB after reading into memory
Scenario 1: Divide all tables into 10 buckets (or more) based on the same fields, as more space is required. Such as 20% reserved); Set the concurrent_lifespans_per_task = 1.
Scenario 2: Divide all tables into 20 buckets (or more) based on the same fields, as more space is required. Such as 20% reserved); Set concurrent_lifespans_per_task = 2.
Reference documentation
- Presto Unlimited: MPP SQL Engine at Scale
- TestHiveIntegrationSmokeTest