0, background

Last Thursday, I received feedback that some Spark tasks in the cluster performed slowly and often made errors. Changing parameters could not optimize its performance and solve the problem of frequent random errors.

Check the historical operation of the task, the average time is about 3h, and it is extremely unstable, and occasionally errors will be reported:

1. Optimize your thinking

What is the running time of a task related to?

(1) Data source size difference

Under limited calculation, the running time of a job is positively correlated with the amount of data. In this example, the amount of data is basically stable, which can be ruled out as the problem caused by the fluctuation of log magnitude:

(2) Logic defects of the code itself

For example, repeated creation, initialization of variables, environment, and RDD resources in the code, arbitrary persistence of data, and extensive use of shuffle operators, such as reduceByKey and Join operators.

In this 100-line code, there are three shuffle operations, and the task is divided into four stages for serial execution by spark Driver. The code positions are as follows:

What we need to do is to reduce shuffle and stage as much as possible from the perspective of algorithm and business, so as to improve the performance of parallel computing. This is a big topic and I will not elaborate on it this time.

(3) Unreasonable parameter setting

This technique is relatively general, so let’s look at the previous core parameter Settings:

Num – executors = 10 | | 20, executor – cores = 1 | | 2, executor – memory = 10 | | 20, driver – the memory = 20, spark. Default. The parallelism = 64

Suppose our Spark queue resources are as follows:

The memory t = 1, cores = 400

How to set parameters here is a bit tricky. First, you need to understand the allocation and usage principle of Spark resources:

In the default non-dynamic resource allocation scenario, Spark is a pre-applied resource and occupies resources until all tasks of the entire job are finished. For example, if you start a Spark-shell on the jumpers and do not exit or execute the tasks, all the applied resources will be occupied. (If num-Executors are set, Dynamic resource allocation fails)

Note that spark allocates resources in a very different way from MapReduce/Hive, and if you don’t understand this, it can cause other problems in parameter Settings.

For example, how many executor-cores are appropriate? * If you do not complete the task by following the following path, you will be unable to perform any of the tasks specified by the * *. If you do not complete the task by following the * *, you will be unable to perform any of the tasks by following the * * if you do not complete the task by following the * *, for example, if you do not complete the task by following the * *, you will be unable to perform any of the tasks by following the * * if you do not complete the task by following the * *, you will be unable to perform any of the tasks by following the * *. Two hundred gigabytes of memory for three hours.

How to set these 5 core parameters according to the task in this case and our existing resources?

1) Executor_cores * NUM_executors should not be too small or too large! Generally, no more than 25% of the total number of queue cores (e.g., 400). The maximum number of queue cores should not exceed 100, and the minimum number of queue cores should not be less than 40, unless the log volume is small.

Executor_cores cannot be 1! Otherwise, the number of threads in the Work process is too small. Generally, 2 to 4 threads are recommended.

3) Executor_memory usually ranges from 6 to 10G, and the maximum executor_memory can be no more than 20G. Otherwise, GC costs are too high or resources are wasted.

4) Spark_parallelism is usually 1 to 4 times of executor_cores* NUM_executors. The default value is 64. If you do not set spark_parallelism, many tasks will be executed in batches or a large number of cores will be idle and resources will be wasted.

5) Driver-memory is set to 20GB. The driver does not perform any computation or storage, but only sends tasks to interact with yarn resource manager and tasks. Unless you are spark-shell, 1-2g is usually enough.

The Spark the Memory Manager:

6) spark. Shuffle. MemoryFraction (0.2 by default), also called ExecutionMemory. This memory area is the buffer needed to handle shuffles,joins, sorts and aggregations in order to avoid frequent IO. If your program has a lot of these operations, it can be adjusted up.

7) spark. Storage. MemoryFraction (0.6 by default), also called StorageMemory. This area of memory is used to handle block cache(where you call methods such as DD. cache and RDD. persist) and broadcasts and storage of task results. This can be done with parameters, and if you’re making a lot of calls to persistent operations or broadcast variables, it can be scaled up.

8) OtherMemory, reserved for the system, because the program itself is required to run memory, (default is 0.2). Other Memory was also tweaked in 1.6 to ensure at least 300M. You can also manually spark. Testing. ReservedMemory. Then subtract the reservedMemory from the actual available memory to get usableMemory. ExecutionMemory and StorageMemory share usableMemory * 0.75 of memory. 0.75 can be set with the new spark.memory.fraction parameter. The spark. Memory. StorageFraction default value is 0.5, so ExecutionMemory, StorageMemory mentioned above was equally divided between the default is the available memory.

For example, if we need to load large dictionary files, we can increase the size of the StorageMemory in executor to avoid global dictionary swapping and reduce GC, in which case we are trading memory resources for execution efficiency.

The final optimized parameters are as follows:

The effect is as follows:

(4) Analyze performance bottlenecks by executing logs

The final mission will take another hour, so where did you spend that hour? According to my experience and understanding, if the data of a single day is not too large and does not involve complex iterative calculation, it should not exceed half an hour.

Since the Spark History Server of the cluster has not been installed and debugged, the visual execution details of historical tasks cannot be viewed through the Spark Web UI. Therefore, I wrote a small script to analyze the specific calculation time information before and after, so as to see which stage is the problem at a clear sight. Targeted optimization.

It can be seen that the bottleneck after optimization is mainly in the final stage of writing Redis. It is a challenge for Redis to write 2.5 billion results of 60G data into redis, which can only be optimized from two perspectives of writing data volume and kv database selection.

Of course, optimization and high performance is a very general and challenging topic. In addition to the code and parameter level mentioned above, there is also how to prevent or reduce data skew, which needs to be analyzed for specific scenarios and logs, and will not be expanded here.

2. Some mistakes of Spark beginners

Spark seems to be powerful and high performance for beginners. Some bloggers and technologists even think spark is the silver bullet in big data batch processing, machine learning, real-time processing, and other fields, replacing MapReduce, Hive, and Storm in minutes. But is this really the case?

As can be seen from the above case, knowing how to use Spark and how to use API and how to use Spark well are two different things. This requires us to not only understand its principles, but also understand the business scenario, and combine the right technical solutions and tools with the right business scenario — there is no silver bullet in this world…

To speed up spark’s performance, you need to make full use of system resources, especially memory and CPU. The core idea is not to spill disks when memory cache is available, not to serialize cpus when parallel, and not to shuffle data when local.

Don’t hold the dragon sword, but used to cut fruit, too not neat. 🙂

Refer:

[1] Spark Memory management

Zhangyi. Gitbooks. IO/spark – in – ac…

[2] Spark Memory parsing

Github.com/ColZer/DigA…

[3] Design of Spark1.6 Memory Management Model – Translation

Ju. Outofmemory. Cn/entry / 24071…

[4] Spark Memory management

Blog.csdn.net/vegetable_b…

[5] Apache Spark memory management in detail

www.ibm.com/developerwo…