The Spark tuning

Spark tuning is a common method. In production, a variety of problems are often encountered. There are pre-cause reasons, in-process reasons, and non-standard reasons.

1. Allocate more resources

Allocate more resources: It is the king of performance optimization and tuning, is to increase and allocate more resources, which is obvious for performance and speed improvement, basically, within a certain range, increase resources and performance improvement, is proportional; After writing a complex Spark job, the first step in performance tuning is to adjust the optimal resource allocation. On this basis, if your Spark assignment has reached the top of your ability range, you can't allocate more resources. The company's resources are limited. That's the point to consider doing the following performance tuning. Related issues: (1What resources are allocated? (2Where can I set up these resources? (3Analyze why the allocation of these resources improves performance.Copy the code

1.1 Resources to Be Allocated

Executor -memory, executor-cores, driver-memoryCopy the code

1.2 Where can I Set These resources

In actual production environments, use the Spark-submit shell script to adjust parameters when submitting spark tasks. Spark-submit \ --master Spark://node1:7077 \
 --class com.hoult.WordCount \
 --num-executors 3\ Configure the number of executors --driver-memory1G \ Configures the driver's memory (not much) --executor-memory1G \ Configure the size of memory for each executor --executor-cores3\ configuration of each executor cpus/export/servers/wordcount. JarCopy the code

1.2 Parameter adjustment to the maximum value

  • = = = = Standalone mode
Calculate the memory size and CPU cores of each node in the Spark cluster, for example, total20Worker nodes, per node8Gb of memory,10A CPU. The actual task can be given a resource20One executor, memory for each executor8G. Number of cpus used by each executor10.Copy the code
  • = = = = Yarn model
Calculate all the yarn cluster sizes, such as total500Gb of memory,100A CPU; The maximum resources that can be allocated at this time, such as given50Number of executors and the memory size of each executor10G, the number of cpus used by each executor is2.Copy the code
  • Using the principle of
In the case of sufficient resources, as much as possible to use more computing resources, try to adjust to the maximum sizeCopy the code

1.3 Why Can The Performance Be Improved after Resource Scaling

--executor-memory

--total-executor-cores 
Copy the code

2. Improve parallelism

2.1 What does Spark’s parallelism mean

The number of tasks in each stage of the Spark job represents the parallelism of the Spark job in each stage. When you allocate as many resources as you can, then adjust the parallelism of the program accordingly. If the parallelism does not match the resources, then the resources you allocate will be wasted. Running in parallel also reduces the number of tasks per task (simple principle). Proper parallelism can make full use of cluster resources, reduce the amount of data processed by each task, and improve the performance and speed up the operation.Copy the code

2.2 How to improve parallelism

2.2.1 You can set the number of tasks

At least set it to SparkApplicationThe total number of CPU cores is equal. In the best case,150One core, distributed150Set the number of tasks to SparkApplicationNumber of total CPU cores2~3Times. Such as150The number of tasks is set to300~500.Contrary to the ideal, some tasks run faster, for example50Some tasks may be slower and take a minute and a half to complete, so if your number of tasks happens to be set to the same number of CPU cores, you may end up wasting resources. Because such as150A task of10One runs first, and the rest140It's still running, but at this point, there are10One CPU core is free, resulting in waste. If set2~3If one task is completed, another task will be completed immediately, so that the CPU core is not idle as much as possible. In addition, improve the efficiency and speed of Spark. Improve performance.Copy the code

2.2.2 How to set the number of Tasks to improve parallelism

Set the spark parameter.default.parallelism has no value by default if it is set to10It will come into play during shuffle. Such asvalRdd2 = rdd1.reduceByKey(_+_) The number of partitions in RDD2 is10Can be done by buildingSparkConfObject, for example:new SparkConf().set("spark.defalut.parallelism"."500")
Copy the code

2.2.3 Resetting the Number of Partitions in the RDD

Repartition with RDD. repartition, which generates a new RDD with a larger number of partitions. In this case, one partition corresponds to one task, so the more tasks there are, the parallelism can be improved in this way.Copy the code

2.2.4 Increasing the number of Tasks run in SparkSQL

Spark.apache.org/docs/2.3.3/…

By setting the parameters of the spark. SQL. Shuffle. Partitions =500The default is200; It can be increased appropriately to increase parallelism. Such as set to spark. SQL. Shuffle. Partitions =500
Copy the code

Set specifically for sparkSQL

3. RDD reuse and persistence

3.1 Description of actual development situations

The calculation logic is shown in the figure above: (1Rdd3 (rdd1); rDD3 (rDD1); rDD3 (rDD1)HDFSRdd2 is obtained by the corresponding operator operation on RDD1, and then RDD3 is obtained after calculation by RDD2. Again, the previous logic is recalculated to calculate rDD4. (3) By default, the RDD and its parent RDD will be recalculated if the operator operation is performed on the same RDD for multiple times to obtain different RDD. This is often the case when you're actually developing code, but it's important to avoid reevaluating an RDD multiple times, which can lead to dramatic performance degradation. Conclusion: The RDD that has been used for many times, that is, the public RDD, can be persisted to avoid subsequent need and recalculate again to improve efficiency.Copy the code

3.2 How to Persist an RDD

  • You can call the CACHE or persist methods of the RDD.
(1Cache methods persist data to memory by default, such as rdd.cache, and essentially call persist (persist).2The persist method has a rich cache level that is defined in theStorageLevelthisobject, you can set the cache level according to the actual application scenarios. Such as: rdd.persist(StorageLevel.MEMORY_ONLY),This is an implementation of the cache method.
Copy the code

3.3 Serialization can be used for RDD persistence

(1If data is persisted in memory normally, it may cause memory usage to be too large, which may causeOOMMemory overflow. (2) when pure memory cannot support publicRDDWhen data is fully stored, serialization in pure memory is preferred. willRDDSerialize the data to each partition of the Serialization greatly reduces memory footprint. (3The only drawback of serialization is that it requires deserialization at the time of retrieving the data. But it can reduce the space occupied and facilitate network transmission (4If serialized in pure memory mode, it still causesOOM, memory overflow; We can only consider the disk mode, the normal mode of memory + disk (no serialization). (5) For high data reliability, and sufficient memory, you can use the double copy mechanism, persistent double copy mechanism, persistent a copy, because the machine crashed, the copy is lost, or have to recalculate; Persisting each data unit, storing a copy on top of other nodes for fault tolerance; If one copy is lost, another copy can be used without recalculation. This way, only if your memory resources are extremely abundant. Such as:StorageLevel.MEMORY_ONLY_2
Copy the code

4. Use of broadcast variables

4.1 Scenario Description

In practical work, a large number of tasks may appear in a stage due to the large amount of data to be processed, such as1000Each task requires the same data to process services. The size of the data is100M, the data will be copied1000A copy is sent to each task over the network for use by the task. There is a lot of network transport overhead involved, and at least memory1000*100M=100G, this memory overhead is very large. Unnecessary memory consumption and occupation leads to your ongoingRDDPersist to memory, may not be completely down in memory; It can only write to disk, resulting in subsequent operations on diskIOUpper consumption performance; This is a disaster for Spark task processing. Due to the high memory overhead, tasks may not be able to store all objects in the heap when creating objects, leading to frequent garbage collector collectionGC.GC"Must cause the worker thread to stopSparkTake a break from work for a while. frequentGCIf, forSparkThe speed at which a job runs can have a considerable impact.Copy the code

4.2 Introduction of broadcast variables

SparkThe distributed execution of code needs to be passed to individual executor tasks to run. For some read-only, fixed data, this is required every timeDriverBroadcast to eachTaskIt's inefficient. Broadcast variables allow variables to be broadcast only to individual executors. Each task on that executor is reloaded from the node'sBlockManager(manages memory and disk data for an executor) instead of fetching variables fromDriverObtain variables, thus improving efficiency.Copy the code
The broadcast variable, when it starts, is atDrvierThere is a copy on the. Through theDriverConvert shared data to broadcast variables. When a task is running and wants to use the data in a broadcast variable, it will first use its own local dataExecutorThe correspondingBlockManager, try to get a copy of the variable; If not locally, then fromDriverRemotely pull a copy of the broadcast variable and save it locallyBlockManager; After that, tasks on that executor will use local ones directlyBlockManagerA copy of the. All tasks in that executor will use a copy of the broadcast variable. This means that an executor only needs to get a copy of the broadcast variable data when the first task is started, and all subsequent tasks will come from the local nodeBlockManagerTo obtain relevant data. The executorBlockManagerIn addition to pulling from the driver, it may also pull from other nodesBlockManagerPull up copies of variables, the closer the network is, the better.Copy the code

4.3 Performance analysis after using broadcast variables

Let's say a mission needs50Executor,1000The shared data is100M。
(1) without using the broadcast variable,1000The shared data is required for each task1000A copy, which means there is1000Copies require a lot of network transport and memory overhead storage. The amount of memory consumed1000*100=100G.

(2) after using the broadcast variable,50An executor is all you need50Copies of data, and not all of them fromDriverTransfer to each node, and possibly pull a copy of the broadcast variable from the blockManager of the executor of the nearest node, greatly increasing the network transfer speed; Memory overhead50*100M=5G summary: the memory overhead of not using broadcast variables is100G, memory overhead after use5G, there's a difference here20About twice the network transmission performance loss and memory overhead, using broadcast variables for performance improvement and impact, or very considerable. The use of broadcast variables does not necessarily have a decisive effect on performance. Such as running30A minute spark job may be faster after the broadcast variable is done2Minutes, or5Minutes. But every little bit makes a mickle. It will work in the end.Copy the code

4.4 Precautions for using Broadcast variables

(1) Can you put aRDDBroadcast out using the broadcast variable? Student: No, becauseRDDIt doesn't store data. Can beRDDThe results are broadcast out. (2Broadcast variables can only be used inDriverEnd definition, cannot be inExecutorThe definition. (3) inDriverEnd can modify the value of the broadcast variable inExecutorThe end cannot modify the value of a broadcast variable. (4) if the executor side is usedDriverIf the broadcast variable is not used inExecutorThere are as many tasks as there areDriverCopy of variables at the end. (5) if theExecutorEnd useDriverIf using broadcast variables in eachExecutorOnly one of themDriverCopy of variables at the end.Copy the code

4.5 How to Use broadcast Variables

  • For example,
(1The broadcast method of sparkContext converts the data to a broadcast variable of typeBroadcast.val broadcastArray: Broadcast[Array[Int]] = sc.broadcast(Array(1.2.3.4.5.6(a))2) and then executorBlockManagerYou can pull a copy of the broadcast variable to retrieve the specific data. A value in a broadcast variable can be obtained by calling its value methodval array: Array[Int] = broadcastArray.value
Copy the code

5. Avoid using shuffle operators

5.1 the shuffle description

Shuffle in Spark involves a large amount of data transfer over the network. Tasks in the downstream phase need to pull the output data of tasks in the earlier phase over the network. In simple terms, the shuffle process involves pulling the same key from multiple nodes in the cluster to the same node. Perform operations such as aggregate or join. For example, reduceByKey and Join can trigger the Shuffle operation. If possible, avoid using shuffle operators. becauseSparkThe most performance - consuming part of a job is the shuffle process.Copy the code

5.2 Which Operator Operations Cause Shuffle

During the development of Spark, operators such as reduceByKey, JOIN, DISTINCT, and Repartition are used to generate shuffle. Since shuffle is very performance-intensive, use non-Shuffle operators of the Map class in actual development. In this case, no shuffle or less shuffleSparkJobs can greatly reduce performance overhead.Copy the code

5.3 How Do I Avoid Generating shuffle

  • A small case
// Error:
// The traditional join operation results in shuffle operation.
// In the two RDD's, the same key needs to be pulled to the same node through the network and joined by a task.
val rdd3 = rdd1.join(rdd2)
    
// Correct way to do this:
// The Broadcast+map join operation does not cause shuffle operation.
// Use Broadcast to use a small RDD as a Broadcast variable.
val rdd2Data = rdd2.collect()
val rdd2DataBroadcast = sc.broadcast(rdd2Data)

// In the rdd1.map operator, all data of rDD2 can be obtained from rdd2DataBroadcast.
If the key of rDD2 is the same as the key of rDD1, then join can be decided.
// Now you can concatenate the current rDD1 data with the concatenated rDD2 data (String or Tuple) in any way you want.
val rdd3 = rdd1.map(rdd2DataBroadcast...)

// Note that the above operation is recommended only when rDD2 data is small (e.g., several hundred megabytes, or one or two gigabytes).
// This is because each Executor memory holds a full copy of RDD2 data.
Copy the code

5.4 Performing shuffle Operations using MAP-side Pre-aggregation

  • The map – side pre-aggregated
If the shuffle operation is required and operators of the Map class cannot be used instead, use operators that can be pre-aggregated by map-side. The so-called map-side preaggregation is the aggregation of the same key locally on each node, similar toMapReduceThe local combiner in. After map-side pre-aggregation, there is only one local key for each node because multiple identical keys are aggregated. When other nodes pull the same key on all nodes, the amount of data that needs to be pulled is greatly reduced, thus reducing the number of disksIOAnd network transport overhead. Generally speaking, it is recommended to use reduceByKey or aggregateByKey operators to replace the groupByKey operator when possible. Because both reduceByKey and aggregateByKey operators use user-defined functions to pre-aggregate the same local key of each node. However, groupByKey operator does not perform pre-aggregation, and the full amount of data is distributed and transmitted among nodes in the cluster, resulting in poor performance. For example, the following two pictures are typical examples of word counting based on reduceByKey and groupByKey respectively. The first is a schematic of groupByKey, which shows that without any local aggregation, all data is transferred between cluster nodes. The second figure is the schematic diagram of reduceByKey. It can be seen that the same key data locally on each node has been pre-aggregated before being transmitted to other nodes for global aggregation.Copy the code
  • == Principle of word counting by groupByKey ==

  • ==reduceByKey Word counting principle ==

6. Use high-performance operators

6.1 use reduceByKey/alternative groupByKey aggregateByKey

  • ReduceByKey/aggregateByKey can undertake pre-aggregated operation, reduce the amount of data transmission, improve performance

  • GroupByKey does not perform pre-aggregation and full data is pulled, resulting in low performance

6.2 Replace normal Maps with mapPartitions

Operators of the mapPartitions class, which process all data of a partition in one call rather than one in one call, provide relatively high performance. Sometimes, however, mapPartitions are usedOOM(Memory overflow) problem. Because a single function call will process all the data on a partition, garbage collection will not be able to reclaim too many objects if there is not enough memory, which is likely to occurOOMThe exception. So be careful when using this type of operation!Copy the code

6.3 Replacing Foreach with foreachPartition

The principle is similar to "use mapPartitions instead of maps" in that all data of a partition is processed in one function call instead of one function call. In practice, operators of foreachPartitions class are very helpful for improving performance. For example, in the foreach function, willRDDWrite all data inMySQL, so if it is a common foreach operator, it will write a data a data, each function call may create a database connection, at this time is bound to frequently create and destroy database connections, performance is very low; However, if the foreachPartitions operator is used to process the data of one partition at a time, only one database connection is created foreach partition and then batch inserts are performed, which results in high performance. Practice found that for1Write about 10,000 pieces of dataMySQL, performance can be improved30More than %.Copy the code

6.4 Performing coalesce Operations after Using Filter

Usually for a personRDDRun the filter operator to filter the packetsRDDAfter more data (e.g30More than %), you are advised to manually reduce the coalesce operatorRDDThe number of partitions will beRDDThe data is compressed into fewer partitions. Because after filter,RDDA lot of data will be filtered out in each partition of. In this case, if the subsequent calculation is carried out as usual, the amount of data in each partition processed by each task is not very large, resulting in a waste of resources. In addition, the more tasks processed at this time, the slower the speed may be. Therefore, use coalesce to reduce the number of partitionsRDDAfter the data is compressed into fewer partitions, all partitions can be processed using fewer tasks. In some scenarios, performance can be improved.Copy the code

6.5 use repartitionAndSortWithinPartitions alternative repartition and sort operations

RepartitionAndSortWithinPartitions isSparkWebsite recommends a operator, the official advice, if you need to repartition after heavy partitions, sorting, suggest use repartitionAndSortWithinPartitions operator directly. Because this operator can shuffle repartitioning and sort at the same time. Shuffle and sort are performed at the same time, and the performance is probably higher than shuffle and sort.Copy the code

7. Optimize serialization performance with Kryo

7.1 Spark serialization Introduction

SparkIn the task calculation, it will involve the network transmission of data across processes and the persistence of data, so it is necessary to serialize data.SparkBy default theJavaSerializer for. The advantages and disadvantages of the default Java serialization are as follows: Advantages: It is easy to handle and does not require us to do other operations manually, but when using an object and variable, need to implementSerializbleInterface. Disadvantages: the default serialization mechanism is not efficient, serialization speed is slow; After the serialization of data, the memory space is relatively large.SparkSupport the use ofKryoSerialization mechanism.KryoSerialization mechanism, than the defaultJavaSerialization mechanism, it's faster, it's smaller, it's aboutJavaSerialization mechanism1/10. soKryoAfter optimization of serialization, less data can be transmitted over the network; The memory resources consumed in the cluster are greatly reduced.Copy the code

7.2 Where Kryo serialization takes effect after it is enabled

KryoThe serialization mechanism, once enabled, works in several places:1The external variables used in the operator function may be related to the driver, which is involved in network transport, so serialization is needed. Ultimately, you can optimize network transport performance, memory usage and consumption in the cluster (2) the persistenceRDDIs serialized,StorageLevel.MEMORY_ONLY_SERWhen persisting RDD, serialization is required at the corresponding storage level. Finally, memory usage and consumption can be optimized; persistenceRDDThe less memory a task occupies, the less frequently objects are created to fill the memoryGC. (3) Shuffle is generated, that is, the task in the downstream stage is widely relied on, and the result data generated by the task in the upstream stage is pulled and transmitted across the network, requiring serialization. Finally, the performance of network transmission can be optimizedCopy the code

7.3 How To Enable the Kryo Serialization Mechanism

// Create a SparkConf object.
val conf = new SparkConf().setMaster(...) .setAppName(...)// Set the serializer to KryoSerializer.
conf.set("spark.serializer"."org.apache.spark.serializer.KryoSerializer")

// Register the custom type to serialize.
conf.registerKryoClasses(Array(classOf[MyClass1], classOf[MyClass2]))
Copy the code

8. Optimize the data format with Fastutil

8.1 introduce fastutil

Fastutil is an extension of the Java standard collections framework (Map, List, Set; HashMap, ArrayList, and HashSet libraries, which provide special types of map, set, list, and queue; Fastutil provides a smaller memory footprint and faster access times. Instead of using the JDK's native Map, List, and Set classes, we use the collection classes provided by Fastutil.Copy the code

8.2 fastutil benefits

The Fastutil collection class, which reduces memory footprint and provides faster access when traversing the collection, retrieving the element's value by index (or key), and setting the element's valueCopy the code

8.3 Scenarios and Application of Fastutil in Spark

8.3.1 Operator functions use external variables

(1) You can use Broadcast variable optimization; (2) Kryo serialization class library can be used to improve serialization performance and efficiency; (3) If the external variables are some kind of large set, then consider using Fastutil to overwrite the external variables; The footprint is first reduced at the source (fastutil), further reduced by broadcasting variables, and further reduced by the Kryo serialized class library.Copy the code

8.3.2 A relatively large set Map/List is used in operator functions

In your operator function, that is, the calculation logic to be performed by the task, if there is a logic, to create a relatively large set of maps, lists, etc., may occupy a large memory space, and may involve performance consuming traversal, access and other set operations; The fastutil class library can be used to reduce the memory footprint of the collection types created by Task to some extent. Avoid frequent executor memory exhaustion and GC invocation, which may degrade performance.Copy the code

8.3.3 Use of FAstutil

The first step: <dependency> <groupId> Fastutil </groupId> <artifactId> Fastutil </artifactId> <version>5.0.9</version> </dependency> Step 2: Replace List (Integer) with IntList. Fastutil = IntList; fastutil = IntList; fastutil = IntList; fastutil = IntList; In particular, Map, Int2IntMap, represents the element type of the key-value mapping.Copy the code

9. Adjust the waiting time for data localization

SparkinDriverOn theApplicationBefore allocating tasks of each stage, it will calculate which fragment data each task needs to calculate.RDDPartition;SparkThe task assignment algorithm of "Task allocation algorithm" preferentially wants each task to be assigned to the same node as the data it computes, so that data does not need to be transferred between networks. But in general, sometimes things don't work as expected. Maybe the task doesn't have a chance to allocate its data to the node. Why? So at times like this, in general,SparkIt will wait a while, by default3Seconds (not always, but in many cases, depending on the localization level, it will wait), and finally, when it is too late to wait, it will choose a bad localization level, such as assigning tasks to a node close to the node where the data to be calculated is located, and then perform the calculation.Copy the code

9.1 Localization Level

PROCESS_LOCAL: process localization code and data in the same process, that is, in the same executor; The task that computes data is executed by executor, and the data resides in executor's BlockManager. Best performance (2) NODE_LOCAL: localization code and data of nodes are in the same node; For example, data is stored as an HDFS block on a node, and tasks are run in an executor on the node. Or data and tasks are in different executors on the same node. Data needs to be transferred between processes; RACK_LOCAL: data and tasks are localized on two nodes of a rack; Data needs to be transmitted between nodes through the network; Poor performance (4) ANY: Unlimited data and tasks can be anywhere in the cluster and not in the same rack. The performance of the worstCopy the code

9.2 Data Localization Wait Time

Spark. Locality. Wait (default: 3s) The best method is used first and demote after 3s. In the end, you can only use the worst.Copy the code

9.3 How Can I Adjust Parameters and Test Them

The wait time for each data localization level is the same as that for spark.locality.wait, which is 3s by default (refer to the parameter description on spark's official website). As shown in the figure below) spark. The locality. Wait. The node spark. The locality. Wait. The process spark. The locality. Wait. RackCopy the code
Set in code: New SparkConf().set("spark.locality.wait","10") then submit the program to the Spark cluster for execution. Pay attention to the execution logs of spark jobs. It is recommended to use client mode for testing. You can directly see the full log locally. Starting Task.... is displayed in the log PROCESS LOCAL, NODE LOCAL..... Such as: Starting Task 0.0 in Stage 1.0 (TID 2, 192.168.200.102, partition 0, NODE_LOCAL, Observe that the data localization level for most tasks is PROCESS_LOCAL. If it is found that many levels are NODE_LOCAL and ANY, then it is a good idea to adjust the wait time for data localization. After each adjustment, run the task again and observe the log to see if the localization level of most tasks has been improved. See if the running time of the entire Spark job is shortened. Note: don't put the cart before the horse when adjusting parameters and running tasks. The localization level has increased, but the spark job's running time has increased due to the large waiting time, so don't adjust it.Copy the code

10. Tune the Spark memory model

10.1 Allocating Executor memory in Spark

  • Executor memory is divided into three main pieces

    • The first is for task execution when we write our own code.

    • The second block is used when the task pulls the output of the task of the previous stage through the shuffle process to perform operations such as aggregation

    • The third block is for RDD caching

10.2 Spark memory Model

In spark16.Previous Spark executors used the static memory model, but in SPARk16.Initially, a unified memory model was added. Through the spark. Memory. UseLegacyMode this parameter to configure default this value isfalse, represents the new dynamic memory model; If you want to use the old static memory model, change this value totrue.Copy the code

10.2.1 Static Memory Model

It's essentially splitting one of our executors into three parts, one of which isStorageThe memory region, part execution region, and part other regions. If you use static memory model, then use these parameters to control: spark. Storage. MemoryFraction: by default0.6The spark. Shuffle. MemoryFraction: by default0.2So the third part is going to be0.2If we cache the data quantity is large, or is our broadcast variable is large, then we will put the spark. Storage. Up the memoryFraction this value. But if we don't have broadcast variable code inside, also have no cache, shuffle and more, that we want to spark. Shuffle. MemoryFraction this value is big.Copy the code
  • Disadvantages of the static memory model
We're configuredStorageAfter the execution area and the execution area, one of our tasks assumes that the execution memory is running out, but its execution areaStorageThe memory area is free, and the two can't borrow from each other, which is not flexible enough, so we came up with our new unified memory model.Copy the code

10.2.2 Unified Memory Model

The dynamic memory model was reserved first300M memory to prevent memory overflow. The dynamic memory model divides the total memory into two parts. Spark.memory. fraction indicates the default value0.6Student: The other part is0.4", and the spark.memory.fraction part is divided into two smaller parts. These two small pieces together account for the total memory0.6.These two parts are:StorageMemory and Execution memory. By the spark. Memory. StorageFraction this parameter to allocate, because the two accounted for0.6. If the spark. Memory. StorageFraction this value is0.5That means this0.6Storage is occupied0.5That is, executor takes over0.3Copy the code
  • What are the features of the unified memory model?
StorageMemory and execution memory can be borrowed from each other. It's not as rigid as the static memory model, but there are rulesCopy the code
Why are the injured storage? This is because data in execution is immediately available, while data in storage is not immediately available.Copy the code

10.2.3 Task Submission Script Reference

  • The following is an example of the spark-submit command for your reference and adjustment according to your own situation
bin/spark-submit \ --master yarn-cluster \ --num-executors 100 \ --executor-memory 6G \ --executor-cores 4 \ - driver - the memory 1 g \ conf spark. The default. The parallelism = 1000 \ - conf spark. Storage. MemoryFraction = 0.5 \ - the conf The spark. Shuffle. MemoryFraction = 0.3 \Copy the code

10.2.4 Personal experience

java.lang.OutOfMemoryError
ExecutorLostFailure
ExecutorThe exit code for the143Executor Lost Hearbeat time out shuffle file lost If you still can't solve the problem, then please listen to the next data skew tuning lecture.Copy the code

Check your profile for more.