background
When Spark is used for data processing, it is inevitable that multiple data sets need to be joined, such as data collision database and field dimension table completion. This is the time when data skew commonly occurs. When data skew occurs, it seriously interferes with the efficiency of job execution and even leads to job failure.
What is data skew?
Data skew is a problem which greatly affects the efficiency of distributed data processing. In a distributed system, the amount of data carried by one node is much higher than that carried by other nodes. As a result, the processing speed of a single node is slow or a single node fails, and the entire operation is dragged down.
Common Shuffle operations, such as Join, ReduceByKey, and GroupByKey, may potentially cause data skew. This section uses Join as an example.
The JOIN type
There are also various Join methods for multiple datasets in Spark.
The differences between LeftJoin and InnerJoin are not discussed here, nor are the underlying implementation principles such as HashJoin and SortMergeJoin. Instead, we discuss the implementation type of Join and, more specifically, the location where the Join occurs.
There are two common implementation types, namely ReduceJoin and MapJoin, which occur in Reduce and Map parts of the program respectively.
Reduce Join
ReduceJoin, as the most common Join type, occurs on the Reduce end (it is inappropriate to describe the Reduce end for Spark tasks). Usually, after the two tables that need to be joined upstream are completed by Map, data is shuffling into disk or memory. The downstream Join task (that is, the Reduce end) pulls shuffle data and performs Join operations.
ReduceJoin has the following two risk points:
- Shuffle is expensive. Data needs to be shuffled from the Map end and sent to the downstream Reduce end in the form of memory, disk, and network I/OS, which costs a lot of hardware.
- The shuffle operation requires one or more fields to determine which node records are distributed to. By default, the key of the shuffle distribution is the join condition key
According to the second point of risk point, if the key field corresponding to the join condition is unevenly distributed, data skew will easily occur. For example, if the account field is empty and has zero value, it is necessary to decide whether to discard this part of data or use other processing methods according to business requirements.
Map Join
Map Join is a Join optimization that occurs on the Map end. Therefore, without Shuffle operation, data skewing is less likely to occur.
The principle of MapJoin is that one small table of two Join tables is loaded to the memory of all actuators by broadcast mode, and the data of the other large table is compared with the data of the small table in the memory of all actuators. Shuffle shuffling is avoided, and full data is calculated based on memory, so the performance is high and data skewering is not easy to occur.
However, MapJoin cannot be used in any scenario, and its use premise is roughly as follows:
The small table must be small enough. If the small table is too small, broadcast times out and the OutOfMemory of the actuator is abnormal
Data skew solutions
In case of data skew, there are various solutions, including the size of left and right table data sets, cluster computing resources, and business requirements. We assume that cluster computing resources are not stuck, there are the following solutions
The small table joins the large table
This is easier to solve by using MapJoin, and in general you can add MapJoin hints in SQL,
/*+ MAPJOIN(smalltable)*/
Copy the code
Alternatively, you can enable MapJoin to increase the threshold of mapJoin expression and the timeout period of broadcast behavior corresponding to mapJoin to improve Join performance
set spark.sql.broadcastTimeout = 600
Copy the code
Large table Join large table
Large table JOIN Large table is complicated, which means that we cannot load any of the left and right tables into memory by MapJoin.
Join of a large table When a large table encounters data skew, the columns corresponding to the Join condition of the table are unevenly distributed. You can use SQL to observe the key distribution.
select
key, count(1) as cnt
from
table
group by key
Copy the code
There are two cases: a large number of invalid keys are tilted, and a large number of valid keys are tilted
A large number of invalid keys are skewed
This is usually caused by improper data reporting, such as zero, null, or null values in the user behavior chart, which can be filtered out in advance.
If the requirement wants to keep the data with zero value, then we can distribute the zero-value data through random numbers, and then restore the random numbers to zero value after the query is completed
Select * from table where length(key) > 0 select * from table where length(key) = 0, md5(rand), Select if (length(key) == 32, null, key) as key from table You can also add other rules from the tableCopy the code
A large number of valid keys are skewed
In the case of a large number of valid keys tilt, for example, a user in the user stream is black, resulting in a large number of brush records, or a user collects a large amount of money. We hope to retain this data, but the calculation process will affect the operation, so we need to further process the Key:
-
To generate a slanted table, add a random N digit before the Key, for example, random 0 to 20. The Key becomes a random number _key. This step essentially breaks up the valid key further to avoid the skew problem.
-
Expand each record corresponding to the Join key of the other table according to the random number of N digits in the first step. The purpose is to Join the processed table data properly, so a key will become N keys, such as 1_key, 2_key, 3_key,… _key, 20.
-
Join the two processed tables, remove the prefix after obtaining the result set, and complete the data processing.