“This is the 12th day of my participation in the Gwen Challenge in November. Check out the details: The Last Gwen Challenge in 2021.”

Hello, everyone, I am Huaijin Shake Yu, a big data meng new, home has two gold swallowing beast, Jia and Jia, can code can teach next almighty dad

If you like my article, you can [follow ⭐]+[like 👍]+[comment 📃], your three companies is my motivation, I look forward to growing up with you ~


Cause 1.

When writing SQL statements, there are often associated queries between large tables and small tables, and then logical grouping such as group by, or there are many judgment conditions. There are many IF statements in SQL, and some structured queries of interval classes. Such SQL statements are executed directly on Spark, which involves a lot of shuffle. And the execution time is extremely slow

In particular, there is a large gap between large tables and small tables. As large tables are the main processing objects, shuffle and map takes a lot of time

2. Optimization begins

2.1 Change to Java code to write programs

The first method is to write spark program in Java code, break all the conditions, or broadcast variables in small tables, and then evaluate and judge each time data is processed

But this will make the code readability is poor, and if you use some tools directly run SQL to calculate the results, the integrity of the program

2.2 use the UDF

User-defined Functions (UDFs) are user-defined Hive Functions. Hive functions cannot fully meet service requirements. In this case, you need to customize functions

We only do the simplest UDF here, is to create a hive function, and then query in a large table, directly to call the method to get the data required to return directly

You can start by defining a UDF class

public class UDF implements UDF2<Long, Long, Long> { Map<Long, TreeMap<Long, Long>> map; public TripUDF(Broadcast<Map<Long, TreeMap<Long, Long>>> bmap) { this.map = bmap.getValue(); } @Override public Long call(Long id, Long time) throws Exception { if (map.containsKey(terminalId)) { Map.Entry<Long, Long> a = map.get(id).floorEntry(time); Map.Entry<Long, Long> b = map.get(id).ceilingEntry(time); if (null ! = a && null ! = b) { if (a.getValue().equals(b.getValue())) { return a.getValue(); } } } return -1L; }}Copy the code

This UDF method is to query the data of the small table and make it into TreeMap, then put the range in it and broadcast it out, and then use the large table to match with ID and time each time in the query, and the successful match is the result to be obtained

In SQL, the time of a large table needs to match the time period of a small table

tablea join tableb 
on tablea.id=tableb.id and 
tablea.time >= tableb.timeStart and 
tablea.time <= tableb.timeEnd
Copy the code

Spark then registers the UDF method

String udfMethod = "structureMap";
spark.udf().register(udfMethod, new UDF(broadcast1), DataTypes.StringType);
Copy the code

You can query large tables directly and then use UDF methods on specific fields to get the results directly

select id,time,structureMap(id,time) as tag from tablea
Copy the code

The final result of the tag is the same as that of the tableb, but the execution is left to Spark to optimize


conclusion

If you like my article, you can [follow ⭐]+[like 👍]+[comment 📃], your three companies is my motivation, I look forward to growing up with you ~

You can pay attention to the public number “Huaijin Shake Yu jia and Jia”, access to resources download