The three optimizations described in the previous three articles will significantly improve the speed of Spark job execution.

There are many other ways to optimize performance, but the improvement is usually not that great compared to the first three.

1. Use broadcast variables

Broadcast variables are used when external variables are used when tasks perform operator function operations. Because, if you don’t use broadcast variable, so the external variables will be used in each task will be to obtain a copy of the variable, and subsequent transfer to the worker nodes to calculate the increased a lot of network transmission on the one hand, on the other hand at the time of task computing takes up too much memory space (or even use disk space), Reading and writing also consumes a lot of IO.

With the broadcast variable, there is initially only one copy on the Driver. When a task computs data in a broadcast variable, it first attempts to obtain it from the local Executor’s corresponding blockManager. A copy of the variable is pulled remotely from the driver (and possibly from the blockManager on the nearest other node) and kept in the local blockManager, which is then used directly by all tasks on that executor.

2. Serialize using Kryo

The default Spark serialization method is Java serialization. Compared with Kryo serialization, Java serialization is slow, and the space of serialized data is relatively large, so using Kryo serialization can improve performance to a certain extent.

Several optimizations can be made with kryo serialization: external variables used in operator functions, the shuffle process when RDD is persisted at the storagelevel. MEMORY_ONLY_SER level.

Usage:

SparkConf.set("spark.serializer"."org.apache.spark.serializer.KryoSerializer").registerKryoClasses(xxxx)
Copy the code

3. Use the Fastutil utility class

Fastutil is a class library that extends the Java standard Collections framework. It can provide a smaller memory footprint and access speed. You can use some collections provided by Fastutil to store data in Spark.

4. Adjust the waiting time for data localization

Spark calculates which partition data each task needs to calculate before assigning tasks of each stage to worker nodes on the Driver. In this case, Spark has its own task allocation algorithm. It is preferred that tasks are allocated to the node where all data is processed, thus saving the performance consumption of data network transmission.

But, if the node capacity is full, or other resources at this time, the Spark will wait for a period of time, if during this time the computing time or resources released, can meet the demand of the task of resources, will continue to assign to this node, if this time is the same, That’s enough time to allocate to the other nodes.

The wait time is set using the spark.locality.wait configuration parameter.

5. JVM tuning

A, reduce the memory ratio of cache operations: Executors on each worker node in the Spark cluster run on their own JVM. The MEMORY of the JVM is divided into two parts, one for caching RDD cache and PERSIST data, and the other for storing objects generated by the operations of each operator. By default, this ratio is 0.6, which means that only 40% of the memory space is allocated to each operator’s computation. If the amount of data generated in the computation is too large, minor GC will be triggered frequently, and even full GC will be triggered, which is very poor performance. This parameter can be adjusted in cases where RDD caching is rarely used.

Adjust the parameter as follows: the spark. Storage. MemoryFraction

B, adjust the amount of memory in the JVM process other than the heap: sometimes, if your Spark job processes a very, very large amount of data, hundreds of millions of data. When the Spark job starts, an error message is displayed. Shuffle file cannot find, Executor, task lost, out of memory. This is where you should consider setting this parameter.

Adjust parameters: – the conf spark. Yarn. Executor. MemoryOverhead this is configured in the shell script.

C, adjust the connection waiting time: encounter a situation without any rule: so-and-so file. A string of file ids. Uuid (DSFSFD-2342VS — SDF — SDFSD) Not found. The file is lost. This is most likely the case of an executor on a worker pulling data from another node that has not been pulled for a long time, exceeding the timeout wait time.

Adjust parameters: – the conf spark. Core. Connection. Ack. Wait. The timeout configuration in a shell script.

For more information on off-heap memory and JVM memory allocation, see these two articles:

www.ibm.com/developerwo…

Blog.csdn.net/baolibin528…

6. Shuffle tuning

A. Merge output files on the Map side: in the shuffle phase, one stage prepares an output file on the Map side for each task of the next stage for the next stage to pull. If the number of tasks in the later stage is large, the map output files generated by the previous stage are extremely large. In this case, you need to enable file merging on the Map side to reduce the number of files and improve the performance in the Shuffle phase.

Adjust parameters: new SparkConf (). The set (” spark. Shuffle. ConsolidateFiles “, “true”)

B. Adjust the memory proportion of the Map server and reduce server. By default, the map server buffer memory is 32 KB, and the Reduce server cache memory is 0.2.

Adjust the parameter as follows: the spark. Shuffle. File. Buffer spark. Shuffle. MemoryFraction

C, HashShuffleManager and SortShuffleManager: reference: my.oschina.net/hblt147/blo…

7. Operator tuning

MapPartitions improve the operation performance of Map classes, use coalesce after filter to reduce the number of partitions, optimize database write performance by foreachPartition, solve the low parallelism performance of Spark SQL by repartition, and reduceByKey in Shuffle will perform a local combine on the Map side, which is much better than groupByKey, so use reduceByKey where you can use reduceByKey.