“This is the 12th day of my participation in the Gwen Challenge in November. Check out the details: The Last Gwen Challenge in 2021.”
Hello, everyone, I am Huaijin Shake Yu, a big data meng new, home has two gold swallowing beast, Jia and Jia, can code can teach next almighty dad
If you like my article, you can [follow ⭐]+[like 👍]+[comment 📃], your three companies is my motivation, I look forward to growing up with you ~
Cause 1.
When writing SQL statements, there are often associated queries between large tables and small tables, and then logical grouping such as group by, or there are many judgment conditions. There are many IF statements in SQL, and some structured queries of interval classes. Such SQL statements are executed directly on Spark, which involves a lot of shuffle. And the execution time is extremely slow
In particular, there is a large gap between large tables and small tables. As large tables are the main processing objects, shuffle and map takes a lot of time
2. Optimization begins
2.1 Change to Java code to write programs
The first method is to write spark program in Java code, break all the conditions, or broadcast variables in small tables, and then evaluate and judge each time data is processed
But this will make the code readability is poor, and if you use some tools directly run SQL to calculate the results, the integrity of the program
2.2 use the UDF
User-defined Functions (UDFs) are user-defined Hive Functions. Hive functions cannot fully meet service requirements. In this case, you need to customize functions
We only do the simplest UDF here, is to create a hive function, and then query in a large table, directly to call the method to get the data required to return directly
You can start by defining a UDF class
public class UDF implements UDF2<Long, Long, Long> { Map<Long, TreeMap<Long, Long>> map; public TripUDF(Broadcast<Map<Long, TreeMap<Long, Long>>> bmap) { this.map = bmap.getValue(); } @Override public Long call(Long id, Long time) throws Exception { if (map.containsKey(terminalId)) { Map.Entry<Long, Long> a = map.get(id).floorEntry(time); Map.Entry<Long, Long> b = map.get(id).ceilingEntry(time); if (null ! = a && null ! = b) { if (a.getValue().equals(b.getValue())) { return a.getValue(); } } } return -1L; }}Copy the code
This UDF method is to query the data of the small table and make it into TreeMap, then put the range in it and broadcast it out, and then use the large table to match with ID and time each time in the query, and the successful match is the result to be obtained
In SQL, the time of a large table needs to match the time period of a small table
tablea join tableb
on tablea.id=tableb.id and
tablea.time >= tableb.timeStart and
tablea.time <= tableb.timeEnd
Copy the code
Spark then registers the UDF method
String udfMethod = "structureMap";
spark.udf().register(udfMethod, new UDF(broadcast1), DataTypes.StringType);
Copy the code
You can query large tables directly and then use UDF methods on specific fields to get the results directly
select id,time,structureMap(id,time) as tag from tablea
Copy the code
The final result of the tag is the same as that of the tableb, but the execution is left to Spark to optimize
conclusion
If you like my article, you can [follow ⭐]+[like 👍]+[comment 📃], your three companies is my motivation, I look forward to growing up with you ~
You can pay attention to the public number “Huaijin Shake Yu jia and Jia”, access to resources download