A, the Spark
How many ways can Spark be deployed? Please briefly discuss each
- 1) Local: running on a machine, usually in a practice or test environment.
- 2) Standalone: build a resource scheduling cluster based on Mster+Slaves, and Spark tasks are submitted to Master for operation. Spark is a scheduling system of its own.
- 3) YARN: Spark clients connect directly to YARN without the need to build an additional Spark cluster. There are two modes, yarn-client and yarn-cluster. The main difference is the node where the Driver program runs.
- 4) MESOS: Less used in domestic environment.
1.2 What do Spark tasks use for submission, the JavaEE interface or scripts
A Shell script.
1.3 Spark submission of job parameters (key)
1) Several important parameters when submitting the task
- Executor-cores — The default number of cores used for each Executor is one. Official recommendations range from two to five cores, but in our enterprise it is four cores
- Num -executors — Number of executors started, default is 2
- Executor-Memory — Executor memory size, default: 1GB
- Darriver-cores — Drivers use the number of cores, which default to 1
- Driver memory — Driver memory size, default 512MB
2) Side to give a style to submit the task
spark-submit \
--master local\[5\] \
--driver-cores 2 \
--driver-memory 8g \
--executor-cores 4 \
--num-executors 10 \
--executor-memory 8g \
--class PackageName.ClassName XXXX.jar \
--name "Spark Job Name" \
InputPath \
OutputPath
1.4 A brief description of Spark’s architecture and job submission process (drawing and explaining the roles of each part) (key points)
Reference: https://blog.csdn.net/wuxintd…
1.4.1, standlone
1.4.2, yarn – cluster
1.5 How to understand the concept of lineage (RDD) in Spark (written emphasis)
Reference: https://blog.csdn.net/wuxintd…
In terms of Lineage Dependencies, RDD can be divided into two kinds: Narrow Dependencies and Wide Dependencies, which are used to solve the high efficiency of data fault tolerance and play an important role in task division.
1.6 Describe how Spark is divided into stages and how each stage determines the number of tasks. (Emphasis of written examination)
Stages: Divide the Job into different stages based on the dependencies between RDDs, and a Stage for a wide dependency.
Task: Stage is a TaskSet that divides the Stage into tasks according to the number of partitions.
1.7 Please list at least 8 transformation operators for Spark and briefly describe the functions (key)
Reference:https://blog.csdn.net/wuxintd…
1) map (func) : Returns a new RDD, which consists of each input element converted by the func function.
2) mapPartitions(func) : is similar to map, but runs independently on each shard of RDD, so the func function type must be Iterator[T] => Iterator[U] when running on RD of type T. If there are N elements and M partitions, the map function will be called N times, and mapPartitions will be called M times. A function will process all partitions at once.
3) reduceByKey (func, [numTask]) : in a (K, V) calls on RDD, return a RDD (K, V), using the reduce function, the same key value aggregate together, reduce the number of tasks can be set by the second optional parameter.
AggregateByKey (ZaloValue :U,[Partitioner: Partitioner]) (Seqop: (U, V) => U, Combop: (U, U) => U: In the RDD of KV pair, the values are grouped and combined according to the key. During the merging, each value and initial value are taken as the parameters of the seq function to calculate, and the returned result is taken as a new KV pair, and then the results are combined according to the key. Finally, the value of each group is passed to the Combine function for calculation (the first two values are calculated, the return result and the next value are passed to the Combine function, and so on), and the key and the calculated result are output as a new KV pair.
5) CombineByKey (CreateCombiner: V=>C, MergeValue: (C, V) =>C, MergeCombiners: (C, C) =>C):
For the same K, I combine V into a set.
1. CreateCombiner: combineByKey() traverses through all elements in the partition so that each element’s key has either not been encountered yet or is the same as the key of a previous element. If this were a new element,combineByKey() would use a function called createCombiner() to create the initial value of the accumulator for that key
MergeValue: If this is a key that was encountered before processing the current partition, it merges the current value of the key’s accumulator with the new value using the mergeValue() method
3. MergeCombiners: Since each partition is handled independently, you can have multiple accumulators for the same key. If you have two or more partitions with accumulators for the same key, you need to combine the results of the partitions using the user-supplied mergeCombiners() method.
…
According to their own situation to choose more familiar operators to introduce.
1.8 Please list Spark’s Actions (no less than 6) and briefly describe their functions (key points).
Reference:https://blog.csdn.net/wuxintd…
1) reduce:
2) collect:
3) first:
4) take:
5) aggregate:
6) countByKey:
7) foreach:
8) saveAsTextFile:
1.9 Please list the Spark operator that causes the Shuffle and briefly describe the function.
ReduceBykey:
GroupByKey:
…
ByKey:
1.10 Describe the workflow of Spark’s two core Shuffles (HashShuffle and SortShuffle) (unoptimized, optimized, normal and bypass SortShuffles) (emphasis )
Unoptimized Hashshuffle:
Optimized Shuffle:
Ordinary SortShuffle:
When the number of shuffle read tasks is smaller than that equivalent to spark.shuffle.sort.
If the value of the bypassMergeThreshold parameter (default is 200), bypass is enabled.
1.11 Differences between Common Spark Operators ReduceByKey and GroupByKey: Which one has more advantages? (key)
ReduceByKey: Aggregate according to key, combine (pre-aggregate) before shuffle, return RDD[k,v].
GroupByKey: Shuffle groupByKey.
Development guidance: ReduceByKey than GroupByKey, recommended to use. However, you need to pay attention to whether the business logic is affected.
Repartition and COALESCE relationships and differences
1) Relationship:
COALESCE (numdPartitions, shuffle = true) COALESCE (numdPartitions, shuffle = true) COALESCE (numdPartitions, shuffle = true)
2) Differences:
Repartition must shuffle, and coalesce determines whether shuffle occurs based on the parameters passed to it
Under normal circumstances
- Use repartition to increase the number of partitions in RDD
- Use COALESCE when reducing the number of Partitions
1.13 Describe the Cache and Persist and Checkpoint mechanisms in Spark respectively, and point out their differences and connections
Both do RDD persistence
Cache: Memory, does not truncate consanguinity, uses data cache during computation.
Checkpoint: Disk, truncate consents, no task has been committed before CK to take effect, CK process will commit an additional task.
1.14 Describe the rationale and use of shared variables (broadcast variables and accumulators) in Spark. (key)
Accumulator is a distributed variable mechanism provided in Spark that works similar to MapReduce in that distributed changes are then aggregated. A common use of an accumulator is to count events during job execution during debugging. Broadcast variables are used to efficiently distribute large objects.
Reasons for the occurrence of shared variables:
Normally, when you pass functions to Spark, such as map() or filter(), you can use variables defined in the driver program, but each task running in the cluster gets a new copy of these variables, and updating the values of these copies does not affect the corresponding variables in the drive.
Spark’s two shared variables, the accumulator and the broadcast variables, break this limitation for two common communication modes: result aggregation and broadcast, respectively.
1.15 How do I reduce the number of database connections Spark is running when it involves database operations?
Use foreachPartition instead of foreach to get the connection to the database within the foreachPartition.
1.16 Describe the differences and relationships among RDD, DataFrame and DataSet in SparkSQL. (Emphasis of written examination)
1) RDD
Advantages:
- Compile-time type safety
- Type errors can be detected at compile time
- Object-oriented programming style
- Manipulate data directly through the class name dot
Disadvantages:
- Performance overhead of serialization and deserialization
- Both cluster communication and IO operations require the serialization and deserialization of the object’s structure and data.
- The performance overhead of GC, as well as the frequent creation and destruction of objects, is bound to increase GC
2) DataFrame
DataFrame introduces Schema and Off-Heap
Schema: Each row of RDD data has the same structure, and this structure is stored in the Schema. Spark uses Schema to read the data, so it only serializes and deserializes the data for communication and IO, leaving out the structure.
3) the DataSet
DataSet combines the advantages of RDD and DataFrame and brings a new concept of Encoder.
When serializing data, Encoder generates bytecode that interacts with the off-heap, enabling on-demand access to the data without deserializing the entire object. Spark does not yet provide an API for custom encoders, but will do so in the future.
The conversion between the three:
1.17 What is the difference between a JOIN and a LEFT JOIN operation in SPARKSQL?
Join is similar to the inner join operation in SQL. It returns results that match in the previous set and the next set, filtering out those that are not related.
LeftJoin is similar to the Left Outer Join in SQL. The result returned is mainly the first RDD, and the records that cannot be associated are empty.
A LEFT SEMI JOIN can be used instead of a LEFT JOIN in some scenarios:
The left table will be skipped when the right table repeats a record. The left table will be traversed when the right table repeats a record. The left table will be traversed when the left join iterates. In the left table, only the column names in the left table are allowed to appear in the result of the last SELECT, because only the JOIN KEY in the right table participates in the association calculation
1.18 Please write out the Spark code implementation for WordCount (Scala) (handwriting code emphasis)
val conf: SparkConf = new SparkConf().setMaster("local\[*\]").setAppName("WordCount")
val sc = new SparkContext(conf)
sc.textFile("/input")
.flatMap(_.split(" "))
.map((_,1))
.reduceByKey(_+_)
.saveAsTextFile("/output")
sc.stop()
1.19. How to use itSpark implements topN(describe ideas or use pseudocode) (emphasis)
Method 1:
- (1) GroupByKey for data
- (2) Convert value to array. SortBy or SortWith is used in Scala to sort (mapValues). If the data is too large, it will be OOM.
Method 2:
- (1) Take out all the keys
- (2) Iterate the keys, and take out one key at a time to sort by using Spark’s sorting operator
Method 3:
- (1) custom partition, according to the key partition, so that different keys into different partitions
- (2) Spark’s sorting operator is used to sort each partition
1.20 Jingdong: detailed comparison of performance before and after tuning (such as adjusting the number of maps, how many before and after the number of maps, and what improvements were made)
Here’s an example. For example, if we have hundreds of files, there will be hundreds of maps. After reading them, join operation will be very slow. At this point we can do a COALESCE operation, say 240 maps, and we can synthesize 60 maps, or narrow dependencies. By shuffling, the number of files produced by the process will be greatly reduced. Improve the time performance of join.
Second, the SparkStreaming
Reference: https://chbxw.blog.csdn.net/a…
2.1. What are some ways to consume data in Kafka in SparkStreaming and what are the differences between them?
1. Receiver based approach
This approach uses Receiver to retrieve data. Receiver is implemented using Kafka’s high-level Consumer API. The data the Receiver gets from Kafka is stored in the memory of the Spark Executor (it’s easy to run out of memory if there’s a sudden data spike and a batch load), and the Spark Streaming launches the Job to process that data.
However, with the default configuration, this approach may lose data due to the underlying failure. If you want to enable a high reliability mechanism that allows zero data loss, you must enable the Write Ahead Log (WAL) mechanism of the Spark Streaming. This mechanism synchronously writes the received Kafka data to a pre-written log on a distributed file system, such as HDFS. So, even if the underlying node fails, the data in the pre-written log can be used to recover.
2. Direct based approach
This new direct approach, which is not based on Receiver, was introduced in Spark 1.3 to ensure a more robust mechanism. Instead of using Receiver to receive data, this method periodically queries Kafka to get the latest offset for each topic+partition, thereby defining the range of offsets for each batch. When the Job that processes data is started, Kafka’s simple Consumer API is used to retrieve the data in the offset range specified by Kafka.
The advantages are as follows:
Simplify parallel reads: If you want to read multiple Partitions, you don’t need to create multiple input Dstreams and then union them. Spark creates as many RDD Partitions as Kafka Partitions and reads data from Kafka in parallel. So there is a one-to-one mapping between Kafka Partitions and RDD Partitions.
High Performance: To ensure zero data loss, in the Receiver-based approach, the WAL mechanism needs to be enabled. This is actually inefficient because the data is actually duplicated twice, and Kafka itself has a highly reliable mechanism for duplicating one copy of the data, which in turn is duplicated to the WAL. In the direct-based approach, it does not depend on Receiver and does not need to open WAL mechanism. As long as the data is copied in Kafka, it can be recovered through the copy of Kafka.
One-and-only transaction mechanism.
3. Comparison:
The Receiver approach uses Kafka’s high-level API to save consumed offsets in ZooKeeper. This is the traditional way to consume Kafka data. This approach combined with the WAL mechanism can guarantee the high reliability of zero data loss, but it cannot guarantee that the data will be processed once and only once, and may be processed twice. Because Spark and ZooKeeper can be out of sync.
Based on the Direct approach, using Kafka’s simple API, the Spark Streaming itself is responsible for tracking the offset of the consumption and storing it in the checkpoint. Spark itself must be synchronized, so data is guaranteed to be consumed once and only once.
In a production environment, the Direct approach is mostly used
2.2 Brief Introduction of SparkStreaming Window Functions (Key Points)
The SparkStreaming window function is encapsulated on top of the original SparkStreaming batch size calculation. Multiple batches are calculated at a time, and a sliding step parameter is passed to set where the next calculation starts after the second calculation is completed.
Time1 in the figure is SparkStreaming to calculate the batch size. The dashed and large solid line boxes are the window sizes and must be integer multiples of the batch. The distance between the dotted line and the large solid line (how many batches are separated) is the sliding step size.
Third, SparkSQL
Pay attention to my public number [big data], more dry goods