Learn more about Java basics
The performance tuning of Spark focuses on the following aspects:
- General performance tuning: allocation of resources, parallelism, RDD architecture and caching, etc
- JVM tuning (Java Virtual Machine) : JVM-related parameters. In general, if your hardware configuration, the underlying JVM configuration, is ok, the JVM usually does not cause serious performance problems; More importantly, JVMS play an important role in Troubleshooting; The JVM causes online Spark jobs to run with an error or even fail (e.g. OOM).
- Shuffle tuning (important) : Performs the shuffle tuning when Spark performs groupByKey and reduceByKey operations. This is important. Shuffle tuning has a significant impact on spark job performance. Experience: During spark operation, shuffle operation consumes 50% to 90% of spark operation performance. 10% is used to run operations such as map, and 90% is used for two shuffle operations. GroupByKey, countByKey.
- Spark operation tuning (Spark operator tuning is important) : groupByKey, countByKey, or aggregateByKey to refactor the implementation. Some operators have better performance than others. ForeachPartition replaces foreach. If once met the right situation, the effect is good.
Simple sorting according to optimization effect:
- Allocation of resources, parallelism, RDD architecture and caching
- Shuffle tuning
- Spark operator tuning
- JVM tuning, broadcast large variables, Kryo, fastUtil
sequence
This series mainly explains:
- Performance tuning
- Allocate more resources
- Modulating parallelism
- Refactor the RDD architecture and persist the RDD
- Broadcast big variable
- Kryo serialization
- FastUtil optimization
- Adjust the wait time for data localization
- The JVM tuning
- Reduce the memory usage of cache operations
- Adjust out-of-executor heap memory
- Example Adjust the connection waiting time
- Shuffle tuning
- Merge map-side output files
- Adjust the memory cache ratio on the Map end and memory ratio on the Reduce end
- SortShuffleManager tuning
- Operator tuning
- Use MapPartition to improve performance
- After filter, use coalesce to reduce the number of partitions
- Optimize with foreachPartition
- Repatition is used to solve Spark SQL’s low parallelism
- Use reduceByKey for local aggregation
Allocate more resources
The king of performance tuning is to add and allocate more resources, and the improvement in performance and speed is obvious; Basically, within a certain range, increasing resources is proportional to improving performance; After writing a complex Spark job, the first step in performance tuning, I think, is to tune the optimal resource allocation; On this basis, if your Spark assignment has reached the top of your ability range, you can’t allocate more resources. The company’s resources are limited. That’s the point to consider doing the following performance tuning.
What resources are allocated?
- Executor number
- Number of CPU cores per executor
- Memory size per executor
- Driver memory size
Where are these resources allocated?
In the production environment, when submitting the Spark job, we use the Spark-submit shell script to adjust the corresponding parameters
/usr/local/ spark/bin/spark - submit \ -- class cn spark. Sparktest. Core. WordCountCluster \ - the number of num - executors 3 \ configuration executor --driver-memory 100m/configures the memory of the driver (with little impact) --executor-memory 100m/configures the memory size of each executor --executor-cores 3 \ Configure the number of CPU cores /usr/ per executorlocal/ SparkTest - 0.0.1 - the SNAPSHOT - jar - with - dependencies. Jar \Copy the code
Why does the extra allocation of these resources improve performance?
Increase the executor
If the number of executors is low, then the number of tasks that can be executed in parallel is low, which means that the parallel execution of our Application is weak. For example, if there are three executors, each of which has two CPU cores, the number of tasks that can be executed in parallel is six. After 6 tasks are executed, the next batch of 6 tasks is replaced. When you increase the number of executors, that means you have more tasks that can be executed in parallel. For example, instead of six, you might be able to do 10, 20, 100 in parallel. So the parallelism is multiplied by several times, tens of times. Accordingly, performance (speed of execution) can also be improved by several to tens of times.
Increase the CPU core per executor
Increasing the CPU core per executor also increases the parallelism of execution. There were 20 executors with only two CPU cores each. The number of tasks that can be executed in parallel is 40 tasks. There are now five CPU cores per executor. The number of tasks that can be executed in parallel is 100 tasks. The speed of execution increased by 2.5 times.
Increase the amount of memory per executor.
After increasing the amount of memory, performance can be improved in three ways:
- If you need to cache the RDD, then with more memory, you can cache more data and write less data to disk, or not at all. Reduced disk IO.
- For shuffle operations, the Reduce side requires memory to store pulled data and aggregate it. If the memory is insufficient, it is also written to disk. If you allocate more memory to executor, you have less data that needs to be written to disk, or not at all. Reduces disk I/O and improves performance.
- For task execution, many objects may be created. If the memory is small, it can cause the JVM heap to be full frequently, followed by frequent GC, garbage collection, minor GC, and Full GC. (Very slow). With more memory, less GC, garbage collection, less slowdowns, faster speeds.
Modulating parallelism
Parallelism: Refers to the number of tasks in each stage of the Spark job, which represents the parallelism of the Spark job in each stage.
What happens if parallelism is too low?
Suppose we have allocated enough resources to our Spark job in the Spark-Submit script, such as 50 executors, each with 10GB of memory, and each with three CPU cores. Resources in the cluster or YARN queue have reached the upper limit. Procedure
Tasks are not set, or very few are set, for example, 100 tasks. Each executor has three CPU cores, which means that any stage of your Application will have a total of 150 CPU cores that can be run in parallel. But now you only have 100 tasks, so if you divide it evenly, each executor gets 2 tasks, ok, so there are only 100 tasks running at the same time, and each executor will only run 2 tasks in parallel. Each executor’s remaining CPU core is wasted.
You have enough resources allocated, but the problem is that the parallelism doesn’t match the resources, causing the resources you allocate to go to waste.
Reasonable parallelism setting
It should be large enough to make full use of your cluster resources; For example, the cluster has 150 CPU cores and can run 150 tasks in parallel. You should set the parallelism of your Application to at least 150 to make full use of your cluster resources and have 150 tasks running in parallel. When the number of tasks is increased to 150, they can be run in parallel at the same time and the amount of data to be processed by each task is reduced. For example, a total of 150 GIGABytes of data needs to be processed. If there are 100 tasks, each task calculates 1.5 gigabytes of data. This is now up to 150 tasks, which can run in parallel, and each task mainly handles 1G of data.
Simply set the parallelism properly to make full use of your cluster computing resources and reduce the amount of data that each task needs to process. Ultimately, you can improve the performance and speed of your Spark job.
- Set the number of tasks to be at least the same as the total number of Spark Application CPU cores (ideally, for example, 150 CPU cores allocated with 150 tasks running at the same time).
- Set the number of tasks to 2-3 times the total number of SPARK Application CPU cores, for example, 150 CPU cores. Set the number of tasks to 300 to 500.
In practice, contrary to the ideal situation, some tasks will run faster, say, in the 50s, and some tasks will run slower, in a minute and a half, so if you have the same number of tasks as the CPU core, you can still waste resources because, say, 150 tasks, Ten of them run first, 140 are still running, but at this point, 10 of the CPU cores are free, resulting in waste. If the number of tasks is set to two or three times of the total number of CPU cores, the CPU core can be replaced by another task immediately after one task is completed. In this way, the EFFICIENCY and speed of Spark jobs can be improved as much as possible.
How do I set the parallelism of a Spark Application?
spark.default.parallelism
SparkConf conf = new SparkConf()
.set("spark.default.parallelism"."500")
Copy the code
Refactor the RDD architecture and persist the RDD
-
As in the first DAG above, by default, the operator is executed on the same RDD multiple times to get different RDD. This RDD and the previous parent RDD are recalculated; Therefore, when calculating RDD3 and RDD4, the previous HDFS file is read, and the operator is executed on RDD1, and RDD2 is calculated twice. This situation is absolutely, absolutely, must be avoided, once there is a RDD double calculation situation, will lead to a sharp performance degradation. For example, HDFS->RDD1-RDD2 time is 15 minutes, then we need to go twice, become 30 minutes.
-
On the other hand, in the second DAG in the figure above, from one RDD to several different RDD, the operator and calculation logic are exactly the same. As a result, due to human negligence, multiple RDD are calculated and obtained. This is also something to avoid.
How do I refactor the RDD architecture
-
1. RDD architecture reconstruction and optimization should reuse RDD as much as possible. Similar RDD can be extracted as a common RDD for repeated use in subsequent RDD calculation.
-
2. The public RDD must be persisted. The public RDD must be persisted for multiple computations and uses. Persistence, that is, the RDD data is cached in memory/disk (BlockManager). No matter how many times the RDD is computed, the RDD persistent data is directly fetched, for example, a data is directly fetched from memory or disk.
-
If the data is persisted in memory normally, it may cause too much memory usage. In this case, it may cause OOM memory overflow. When pure memory cannot support full storage of common RDD data, serialization in pure memory is preferred. Serialize the data of each partition of the RDD into a large byte array, just one object; Serialization greatly reduces memory footprint. The only drawback of serialization is that it requires deserialization at the time of retrieving the data.
-
4. In order to high reliability of data, and sufficient memory, you can use the double-copy mechanism, persistent double-copy mechanism, persistent a copy, because the machine is down, the copy is lost, or have to recalculate; A copy of each persistent data unit is stored on top of other nodes; So fault tolerance; If one copy is lost, another copy can be used without recalculation. This way, only if your memory resources are extremely abundant
Broadcast big variable
For more on broadcasting, read Spark (II) – Shared variables in RDD basics.
Why use broadcast large variables
The driver sends the variable to the Excutor side as a task. Each task gets a copy of the variable. If there are many tasks, there will be many that carry many variables to the Excutor side. If the variable is very large, it may cause memory overflow.
For example, the external variable map is 1M. In total, your previous tuning is very good, the resources are in place, with resources, parallelism adjustment is absolutely in place, 1000 tasks. A large number of tasks do run in parallel. Each task uses a map that occupies 1 MB memory. First, 1000 copies of the map are copied and sent to each task over the network. In total, 1 gigabyte of data will be sent over the network. Network transmission overhead, not optimistic ah!! Network traffic may consume a small fraction of the total time your Spark job runs. Map copies, once transferred to various tasks, consume memory. A map is not big, 1M; With 1000 maps distributed across your cluster, you can consume 1G of memory at a time. What is the impact on performance? Unnecessary memory consumption and usage, as a result, you are doing RDD persistence to memory, may not be able to lay down completely in memory; The data can only be written to the disk, resulting in performance consumption on disk I/OS in subsequent operations. When creating objects, your task may find that there is not enough space in the heap for all objects, which may result in frequent garbage collector collections, or GC. When GC is performed, it must cause the worker thread to stop, that is, Spark to stop working for a short time. Frequent GC can have a significant impact on the speed of Spark jobs.
Broadcasting large variable principle
- The broadcast variable, when it starts, has a copy on Drvier.
- When a task wants to use the data in a broadcast variable, it first attempts to obtain a copy of the variable from its local Executor’s corresponding BlockManager. If not, a copy of the variable is pulled remotely from the Driver and stored in the local BlockManager. Tasks on that executor will then use the copy in the local BlockManager directly.
- The Executor’s BlockManager may pull copies of variables from other nodes as well as from the driver, as close as possible.
Serialization using Kryo
- By default, Spark uses Java’s serialization mechanism, ObjectOutputStream/ObjectInputStream, to serialize objects. The advantage of this default serialization mechanism is that it is relatively easy to process; You don’t need to do anything manually, except that the variables you use in the operator must be Serializable.
- However, the disadvantage is that the default serialization mechanism is not efficient and the speed of serialization is slow. After the serialization of data, the memory space is relatively large. The serialization format can be optimized manually, and Spark supports the Kryo serialization mechanism. The Kryo serialization mechanism is faster and smaller than the default Java serialization mechanism, about 1/10 of the size of the Java serialization mechanism. Therefore, Kryo serialization optimization can make the network transmission of less data; The memory resources consumed in the cluster are greatly reduced.
The Kryo serialization mechanism, once enabled, works in several places:
- External variables used in the operator function external variables used in the operator function, after using Kryo: optimize the performance of network transmission, can optimize the cluster memory occupation and consumption
- Serialization during persistent RDD, such as storagelevel. MEMORY_ONLY_SER, to optimize memory usage and consumption. The less memory a persistent RDD consumes, the less memory a task creates, and the less GC occurs.
- Shuffle can optimize network transmission performance
How do I use Kryo
- The first step, setting an attribute in the SparkConf, spark. The serializer, org. Apache. Spark. Serializer. KryoSerializer classes;
SparkConf.set("spark.serializer"."org.apache.spark.serializer.KryoSerializer")
Copy the code
Here’s why Kryo is not the default serialization library: This is mainly because Kryo requires you to register your custom classes if you want to achieve its best performance (for example, if your operator functions use object variables of external custom types, you must register your classes, otherwise Kryo will not achieve its best performance).
- The second step is to register the customizations you use that need to be serialized through Kryo
.set("spark.serializer"."org.apache.spark.serializer.KryoSerializer")
.registerKryoClasses(new Class[]{CategorySortKey.class})
Copy the code
Optimize the data format with fastUtil
Fastutil is an extension of the Java standard collections framework (Map, List, Set; HashMap, ArrayList, and HashSet libraries, which provide special types of map, set, list, and queue; Fastutil provides a smaller memory footprint and faster access times. We use the fastutil collection class to replace the JDK’s native Map, List, and Set classes. The advantage is that the Fastutil collection class can reduce the memory footprint, and when traversing the collection, fetching the value of the element by index (or key), and setting the value of the element, the fastutil collection class can reduce the memory footprint. Provide faster access speed; Fastutil also offers 64-bit arrays, sets, and lists, as well as high-performance, fast, and useful IO classes for handling binary and text-type files; The latest version of Fastutil requires Java 7 and above;
Usage scenarios
Scenarios where Fastutil is used in Spark:
- If the operator function uses an external variable; So first, you can use Broadcast Broadcast variable optimization; Second, you can use the Kryo serialization class library to improve serialization performance and efficiency. Third, if the external variables are some kind of large collection, consider using Fastutil to overwrite the external variables, reducing the memory footprint first at the source, further reducing the memory footprint by broadcasting the variables, and further reducing the memory footprint through the Kryo serialized class library.
- In your operator function, that is, the calculation logic to be performed by the task, if there is a logic, to create a relatively large set of maps, lists, etc., may occupy a large memory space, and may involve performance consuming traversal, access and other set operations; The fastutil class library can be used to reduce the memory footprint of the collection types created by Task to some extent. Avoid frequent executor memory exhaustion and GC invocation, which may degrade performance.
The use of fastutil
Step 1: Reference fastutil’s package in POM.xml
<dependency> <groupId>fastutil</groupId> <artifactId>fastutil</artifactId> <version>5.0.9</version>Copy the code
Step 2: List => IntList
IntList fastutilExtractList = new IntArrayList();
Copy the code
Adjust the data localization duration
But, in general, sometimes the task doesn’t have a chance to allocate its data to the node. Why? As a result, Spark will typically wait for a period of time, 3s by default (not always, but in many cases for different localisation levels), and at the end of the day, when it can no longer wait, it will choose a bad localisation level, for example, Assign the task to the node closest to the data it is computing, and perform the computation.
In the second case, however, usually, data transfer must occur, and the Task gets the data through the BlockManager of its node. The BlockManager finds that it has no data locally and uses a getRemote() method, The TransferService (Network Data Transfer Component) obtains data from the BlockManager of the node where the data resides and sends the data to the node where the task resides over the network.
For us, we certainly don’t want it to be like the second case. The best, of course, is to have tasks and data on the same node, directly fetching data from the local executor’s BlockManager, pure memory, or with a bit of disk IO; If you’re going to send data over the network, performance is definitely going to suffer, and a lot of network traffic, as well as disk IO, is a performance killer.
When should this parameter be adjusted?
Observe logs and Spark job run logs. It is recommended that you test in client mode so that you can view all logs on the local PC. Starting Task… , PROCESS LOCAL, NODE LOCAL Observe the data localization level of most tasks
If most of them are PROCESS_LOCAL, then you don’t need to adjust them. If you find a lot of them are NODE_LOCAL, ANY, then you might want to adjust the wait time for data localization. Observe the log to see if the localization level of most tasks has improved; See if the running time of the entire Spark job is shortened
Don’t put the cart before the horse, the localization level has increased, but the spark job’s running time has increased due to the large waiting time, so don’t adjust it
How to adjust?
new SparkConf()
.set("spark.locality.wait"."10")
Copy the code
- Spark.locality.wait, default is 3s; 6 s, 10 s
By default, the following three wait times are the same as the one above, which is 3s
- spark.locality.wait.process
- spark.locality.wait.node
- spark.locality.wait.rack