Overview: This topic describes how to optimize MaxCompute Spark resources. The purpose of this topic is to optimize Spark resource usage, maximize resource utilization, and reduce cost while ensuring the normal operation of Spark tasks.

Author: Wu Shujie Ali Cloud intelligence development engineer

1. An overview of the

This topic describes how to tune MaxCompute Spark resources. The purpose of this topic is to optimize Spark resource usage, maximize resource utilization, and reduce cost while ensuring the normal operation of Spark tasks.

2. Sensor

  • Sensor provides a visual way to monitor the running Spark process. Each worker (Executor) and master (Driver) have their own status monitoring graph, which can be found in Logview, as shown in the figure below:

  • With Sensor turned on, you can see the following figure showing the CPU and memory usage of the Driver/Executor over its lifetime:

  • Cpu_plan /mem_plan (blue line) represents the planned amount of CPU and memory requested by the user

  • You can visually see the CPU utilization in the running task from the CPU_Usage diagram

  • Mem_usage indicates the memory usage during task running. It is the sum of mem_RSS and Page cache

  • Memory Metrics

  • Mem_rss indicates the resident memory occupied by a process. This memory is the actual memory used by Spark tasks. If the memory exceeds the required memory, the Driver or Executor process may be terminated. In addition, the curve can also be used to guide the user to optimize the memory, if the actual usage is far less than the user request, you can reduce the memory request, maximize the utilization of resources, and reduce the cost.

  • The mem_cache (page_cache) is used to cache data from disks into memory, thereby reducing disk I/O operations. The mem_cache is usually managed by the system. If the physical machine has sufficient memory, the mem_cache may be used too much.

3. Adjust resource parameters

(1) Executor Cores

Related parameters: spark.executor.cores

  • The number of cores per Executor, that is, the number of tasks that can run simultaneously in each Executor
  • The maximum parallelism of the Spark task is Num-Executors * executor-cores
  • When Spark tasks are executed, a CPU core can execute a maximum of one Task at a time. If you have a sufficient number of CPU cores, you can generally execute these tasks quickly and efficiently. It is also important to note that the memory of each Executor is shared by multiple tasks. If there are too many Executor cores and too little memory, then OOM is likely to occur.

(2) Executor Num

Related parameters: spark.executor.instances

  • This parameter is used to set the total number of Executor processes used to execute Spark jobs
  • Users can usually decide how many executors to apply for based on the complexity of the task

In addition, it is important to note that if Executor disk space is insufficient or some Executor OOM problems occur, you can reduce the number of Executor cores per Executor and increase the number of Executor instances to keep the overall parallelism of the task unchanged and reduce the risk of task failure.

(3) Executor Memory

Related parameters: spark.executor.memory

  • This parameter is used to set the memory for each Executor process. In many cases, the size of Executor memory directly determines the performance of Spark jobs, and the JVM OOM is more common among executors.

Related parameters 2: spark. Executor. MemoryOverhead

  • Note that memoryOverhead is not used for computation, and neither user code nor Spark can directly operate on memoryOverhead.
  • If this value is not set, the default value is spark.executor. Memory * 0.10 and the minimum value is 384 MB

An Executor out of memory looks like this:

  • Cannot allocate memory appears in the Executor log (Logview-> a Worker->StdErr)

  • The job has been killed by “OOM Killer”, please check your job’s memory usage.
  • Very high memory usage was found in the Sensor
  • Log in on the Executor Java. Lang. OutOfMemoryError: Java heap space
  • GC overhead limit exceeded Occurs in the Executor log. Procedure
  • Frequent GC information is found in the Spark UI. Procedure
  • No route to host: workerd********* / Could not find CoarseGrainedScheduler

Possible causes and Solutions:

  • Limit executor parallelicity and make cores smaller: Multiple concurrent tasks share executor memory, reducing the amount of memory available to a single Task. Reducing parallelicity reduces memory stress and increases the amount of memory available to a single executor
  • Increase the number of partitions to reduce the load per executor
  • Consider data skewness, because data skewness leads to insufficient memory for one task and sufficient memory for other tasks
  • If you Cannot allocate memory or The job has been killed by “OOM Killer”, please check your job’s memory usage. This usually is caused by a lack of system memory, can be appropriately increased some heap memory to alleviate the pressure of the memory, usually set the spark, executor. MemoryOverhead is 1 g / 2 g is enough

(4) Driver Cores

Parameters spark.driver.cores

  • Generally, the Driver Cores are not very large, but if the tasks are complex (such as too many stages and tasks) or too many executors (the Driver needs to communicate with each Executor and maintain a heartbeat) and you see high Cpu utilization in the Sensor, You may need to increase the Driver Cores appropriately

Note that when you run the Spark task in Yarn-cluster mode, you cannot directly set the Driver resource configuration (core/memory) in the code, because this parameter is required when the JVM starts. This needs to be set via the –driver-memory command line option or in the spark-defaults.conf file /Dataworks configuration item.

(5) Driver Memory

Parameter 1: spark.driver.memory

  • Set the heap memory for the Driver to be applied for, similar to executor

Related parameters 2: spark. Driver. The maxResultSize

  • This parameter specifies the total result size of each Spark action (for example, collect). The default value is 1g. If the total size exceeds this limit, the job will be aborted. If the value is higher, the Driver will be OOM. Therefore, the user needs to set an appropriate value based on the actual situation of the job.

Related parameters 3: spark. Driver. MemoryOverhead

  • Set the out-of-heap memory for the Driver, similar to executor

  • The memory of the Driver does not need to be large. If the memory of the Driver is insufficient, the Driver collects too much data. If you need to use the collect operator to pull all RDD data to the Driver for processing, ensure that the memory of the Driver is large enough.

Form of expression:

  • The Spark application does not respond or stops
  • The Driver OutOfMemory error was found in the Driver log (Logview->Master->StdErr)
  • Frequent GC information is found in the Spark UI. Procedure
  • Very high memory usage was found in the Sensor
  • Cannot allocate memory occurs in the Driver log

Possible causes and Solutions:

  • The code may use the collect operation to collect a large dataset to the Driver node
  • The code creates too large an array, or loads too large a data set to the Driver process summary
  • SparkContext and DAGScheduler both run on the Driver side. The Stage splitting of the RDD is also performed on the Driver. If a program written by a user has too many steps and too many stages are divided, these information consumes the memory of the Driver. In this case, you need to increase the memory of the Driver. Sometimes, if there are too many stages, the Driver side may even overflow the stack

(6) Local disk space

Related parameters: spark. Hadoop. Odps. Cupid. Disk. Driver. Device_size:

  • This parameter indicates the disk space requested for a single Driver or Executor. The default value is 20 GB. A maximum of 100 GB is supported
  • Shuffle data and data that overflows the BlockManager are stored on disks

Insufficient disk space can be expressed in the following ways:

No space left on device error found in Executor/Driver log

Solution:

The simplest method is to directly add more disk space, augmenting the spark. Hadoop. The odps. Cupid. Disk. Driver. Device_size.

If the error persists after the size is increased to 100 GB, the error may be due to data skew. Data in the shuffle or cache process are distributed in some blocks, or the amount of shuffle data of a single Executor is really too large. You can try the following methods:

  • Repartition data to solve the data skewing problem
  • Narrow the number of concurrent tasks for a single Executor to Spark.executor.cores
  • Narrow reading table concurrent spark. Hadoop. Odps. Input. The split. The size
  • Add the number of executors spark.executor.instances

Note:

  • Because disks need to be mounted before the JVM starts, this parameter must be configured in the spark-defaults.conf file or dataWorks configuration item, and cannot be configured in user code
  • In addition, note that the unit of this parameter is G and g cannot be omitted
  • In many cases, the parameter does not take effect and the task still fails because the user configuration location is incorrect or does not have the unit G

4. To summarize

In order to maximize the use of resources, it is recommended to apply for a single worker resource according to the ratio of 1: 4, that is, 1 core: 4 GB memory. If OOM is displayed, check logs and sensors to locate the problem, and then optimize and adjust resources accordingly. It is not recommended to set too many single Executor Cores. Typically, a single Executor is relatively safe at 2-8 Cores, and if it is higher than that, it is recommended to increase the number of instances. Increasing off-heap memory (reserving some memory resources for the system) is also a common tuning method, and usually solves many OOM problems in practice. Finally, the user can refer to the official document spark.apache.org/docs/2.4.5/…

The original link

This article is ali Cloud original content, shall not be reproduced without permission.