This is the third day of my participation in Gwen Challenge

preface

Welcome to our GitHub repository Star: github.com/bin39232820… The best time to plant a tree was ten years ago, followed by now

Where t

The basics of Scala and Spark are pretty much there, so here’s the first one

Today we continue with school Spark

RDD overview of

What is RDD?

Resilient Distributed Dataset (RDD) Resilient Distributed Dataset (RDD) is the most basic data abstraction in Spark. It represents an immutable and partitioned collection whose elements can be computed in parallel. RDD has the characteristics of a data flow model: automatic fault tolerance, location-aware scheduling, and scalability. RDD allows users to explicitly cache the working set in memory when executing multiple queries and reuse the working set for subsequent queries, which greatly improves query speed.

The properties of RDD

(1) A group of partitions, namely, the basic constituent units of a data set. For RDD, each shard is processed by a computation task that determines the granularity of parallel computation. The user can specify the number of shards in the RDD when creating the RDD; if not, the default value is used. The default value is the number of CPU cores allocated to the program.

(2) A function that calculates each partition. The RDD in Spark is calculated in fragments. Each RDD implements the compute function to achieve this goal. The function compute compacts iterators without saving the result of each calculation.

(3) Dependence between RDD. Each transformation of an RDD generates a new RDD, so there is a pipelined dependency between RDD’s. If data of some partitions is lost, Spark can recalculate the lost partition data through this dependency relationship instead of recalculating all partitions of the RDD.

(4) A Partitioner, which is the sharding function of RDD. Two types of sharding functions are currently implemented in Spark, the HashPartitioner based on hash and the RangePartitioner based on range. Partitioner is only a Partitioner for a KEY-value RDD, and the value to the Parititioner for a non-key-value RDD is None. The Partitioner function determines not only the number of fragments in the RDD itself, but also the number of fragments in the output of the parent RDD Shuffle.

(5) A list that stores the preferred location of accessing each Partition. For an HDFS file, this list stores the location of each Partition in the block. Following the principle that “Mobile data is better than mobile computing”, Spark tries to allocate computing tasks to storage locations of data blocks to be processed.

How to create an RDD

  • It is created from data sets of external storage systems, including local file systems, and all data sets supported by Hadoop, such as HDFS, Cassandra, and HBase
scala> val file = sc.textFile("/spark/hello.txt")
Copy the code
  • Create the RDD in parallel
Scala > val array = array (1,2,3,4,5) array: Array[Int] = Array(1, 2, 3, 4, 5) scala> val rdd = sc.parallelize(array) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at parallelize at <console>:26 scala>Copy the code
  • The other way

Read the database and so on. RDD can also be generated. RDD can be converted from other RDD.

RDD programming apis

Spark supports two types of operator operations: Transformation and Action

  • Transformation

The main thing to do is to generate an existing RDD into another RDD. Transformation has lazy (lazy loading). The code for the Transformation operator is not actually executed. Only when our program encounters an action operator will the code actually be executed. This design makes Spark run more efficiently.

Common Transformation:

  • Action

To trigger the code to run, we need to have at least one action in each Spark code.

The commonly used Action:

Spark WordCount code

< dependencies > < the dependency > < groupId > org. Apache. The spark < / groupId > < artifactId > spark - core_2. 11 < / artifactId > <version>2.2.0</version> </dependency> </dependencies> import java.util.arrays; /** * @author 小 66 * @version 1.0 * @date 2020/12/26 14:43 */ public class SparkWordCountWithJava8 {public static void main(String[] args) { SparkConf conf = new SparkConf(); conf.setAppName("WortCount"); conf.setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> fileRDD = sc.textFile("E:\\hello.txt"); JavaRDD<String> wordRdd = fileRDD.flatMap(line -> Arrays.asList(line.split(",")).iterator()); JavaPairRDD<String, Integer> wordOneRDD = wordRdd.mapToPair(word -> new Tuple2<>(word, 1)); JavaPairRDD<String, Integer> wordCountRDD = wordOneRDD.reduceByKey((x, y) -> x + y); JavaPairRDD<Integer, String> count2WordRDD = wordCountRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1)); JavaPairRDD<Integer, String> sortRDD = count2WordRDD.sortByKey(false); JavaPairRDD<String, Integer> resultRDD = sortRDD.mapToPair(tuple -> new Tuple2<>(tuple._2, tuple._1)); resultRDD.saveAsTextFile("E:\\result8"); }Copy the code

Spark Running Process

Basic concepts of Spark

(1) Application: represents your Application

(2) Driver: represents the main() function to create SparkContext. SparkContext communicates with ClusterManager, applies for resources, allocates tasks, and monitors ClusterManager. After the program is executed, close SparkContext

(3) Executor: An Application runs a process on the Worker node that is responsible for running tasks and storing data in memory or disk. In Spark on Yarn mode, the process name is CoarseGrainedExecutor Backend. One CoarseGrainedExecutor Backend has only one executor object. It is responsible for the Task into taskRunner, and extract a free from the thread pool threads running Task, in this way, each CoarseGrainedExecutorBackend can run in parallel data the Task depends on the number of CPU allocated to it.

(4) Worker: node that can run Application code in the cluster. In Standalone mode, the worker node is configured through the slave file. In Spark on Yarn mode, the NodeManager node is displayed.

(5) Task: A unit of work that performs a Task in an Executor process. Multiple tasks form a Stage

(6) Job: parallel computation consisting of multiple tasks, which is triggered by Action behavior

(7) Stage: Each Job is divided into several groups of tasks, which is called a TaskSet

(8) DAGScheduler: Build stage-based DAGS according to jobs and submit stages to TaskScheduler. Stages are divided according to the dependencies between RDD

(9) TaskSet scheduler: The TaskSet is presented to the Worker (cluster) to run. This is where each Executor is assigned tasks to run.

Spark running process

The basic running process of Spark

instructions

  • Build an environment to run the Spark Application (start SparkContext). SparkContext registers with the resource manager (such as Standalone, Mesos, or YARN) and applies to run Executor resources.
  • Resource manager allocate Executor resources and start StandaloneExecutorBackend, Executor operation will be as the heartbeat is sent to the resource manager;
  • SparkContext builds a DAG graph, breaks the DAG graph into stages, and sends the Taskset to the Task Scheduler. Executor applies for tasks from SparkContext
  • The Task Scheduler issues tasks to executors while SparkContext issues application code to executors.
  • The Task runs on an Executor, and all resources are released.

The illustration

Spark operating architecture features

  • Each Application gets its own Executor process that resides for the duration of the Application and runs tasks in a multithreaded manner. This Application isolation mechanism has its advantages, both from a scheduling perspective (each Driver schedules its own tasks) and from a running perspective (tasks from different applications run in different JVMS). Of course, this also means that Spark Application cannot share data across applications unless it is written to an external storage system.
  • Spark has nothing to do with the resource manager, as long as you can get executor processes and keep communicating with each other.
  • The Client submitting SparkContext should be close to the Worker node (the node running Executor), preferably in the same Rack, because there is a lot of information exchanged between SparkContext and Executor during Spark Application execution. If you want to run SparkContext in a remote cluster, it is better to use RPC to submit SparkContext to the cluster rather than run SparkContext away from the Worker.
  • Task uses optimization mechanisms for data localization and speculative execution.

DAGScheduler

Job= multiple stages, stage = multiple tasks of the same type. Task can be ShuffleMapTask and ResultTask. Dependency can be ShuffleDependency and NarrowDependency

For stage segmentation, the segmentation is based on wide dependence

Maintain waiting jobs and active jobs, maintain waiting stages, active stages, and failed stages, and map them to Jobs

Main Functions:

  • Receive the main entry of the submitted Job, submitJob(RDD…) Or runJob (RDD,…). . In SparkContext these two methods are called. Generate a Stage and commit it, then determine whether any parent Stage is incomplete, if so, commit and wait for the parent Stage, and so on. The result: DAGScheduler added several waiting stages and a running stage. After submitting the running stage, analyze the types of tasks in the stage and generate a Task description, that is, a TaskSet. Call TaskScheduler. SubmitTask (taskSet,…). Method to submit a Task description to the TaskScheduler. The TaskScheduler allocates resources to each TaskSet and triggers execution based on the amount of resources and trigger allocation criteria. After the DAGScheduler submits the job, the JobWaiter object is returned asynchronously. The JobWaiter object can return the running status of the job and cancel the job. After the job is successfully executed, the JobWaiter object is processed and the result is returned

  • If the task is successfully executed, subtract the task from the corresponding stage and do some counting: If the task is ResultTask, the Accumulator counter is incremented by one. If the task is true in the job, the total number of job finishes is incremented by one. If the number of finishes is equal to the number of partitions, the stage is complete. Mark the stage as complete, subtract the stage from the running stages, and do some cleaning for removing stages. This counter is an Accumulator with an output location in the stage. MapStatus is a return from ShuffleMapTask execution, containing location information and block size(optionally compressed or uncompressed). At the same time, check that the stage is complete and register the shuffleId and location information of this stage with MapOutputTracker. Then check whether there is empty in the output location of the stage. If there is empty, it indicates that some tasks failed and the whole stage is resubmitted. Otherwise, continue to submit the next stag that needs to be done from waiting stages. If the task is resubmitted, add this TAS to the corresponding stage. If the task fails to fetch, immediately mark the corresponding stage completion and subtract it from running stages. If retry is not allowed, abort the entire stage; Otherwise, resubmit the entire stage. In addition, remove the location and map task information related to the fetch from the stage and cancel it from the MapOutputTracker. Finally, if the blockManagerId object of the fetch is not empty, an ExecutorLost process will be performed, and next time the shuffle will be switched to another executor to execute other task states, which will be handled by the TaskScheduler. Such as Exception, TaskResultLost, commitDenied, etc.

  • Other job-related operations include Cancel job, Cancel stage, and resubmit failed stage

TaskScheduler

Maintains mappings between tasks and executors, between executors and physical resources, between queued tasks and running tasks.

Internally, a task queue is maintained and tasks are scheduled according to FIFO or Fair policies.

SchedulerBackend

In the lower layer of TaskScheduler, SchedulerBackend is used to interconnect with different resource management systems. SchedulerBackend is an interface and can be implemented in the following ways:

The operating architecture of Spark in different clusters

Spark focuses on building a good ecosystem. It not only supports multiple external file storage systems, but also provides a variety of cluster operation modes. When deployed on a single machine, it can run in either Local mode or pseudo-distributed mode. If the Standalone mode is deployed in a distributed Cluster, choose the Standalone mode (provided by Spark), yarn-client mode, or yarn-cluster mode based on the actual situation of the Cluster. The various Running modes of Spark vary in startup mode, running location, and scheduling policies, but they all have the same purpose: To run and manage tasks in a secure and reliable location based on user configurations and Job requirements.

Spark on Standalone Runs the process

Standalone mode is a resource scheduling framework implemented by Spark. The Standalone mode consists of Client nodes, Master nodes, and Worker nodes. The Driver can run on either the Master node or the local Client. When spark-shell is used to submit the Spark Job, the Driver runs on the Master node. When you use the Spark-submit tool to submit a Job or run the Spark task in the “new SparkConf().setmaster (” Spark ://master:7077″) mode on a development platform such as Eclips or IDEA, The Driver runs on a local Client.

  • We submit a task, and the task is called Application
  • Initialize the program’s entry, SparkContext,
    • Initialize the DAG Scheduler
    • Initialize the Task Scheduler
  • The Task Scheduler registers and requests resources (CPU Core and Memory) from the master.
  • Master according to the requirements of the SparkContext resource application and Worker heartbeat cycle report information decided to allocate resources on which the Worker, and access to resources on the Worker, then start StandaloneExecutorBackend; A thread pool is initialized
  • StandaloneExecutorBackend to Driver (SparkContext) registration, so that Driver knew what Executor for his services. At this point, our initialization is almost complete, and we’re starting to execute the Transformation code, but the code won’t actually run until we hit an action. Create a job task and divide it into stages
  • SparkContext Applicaiton code sent to StandaloneExecutorBackend; SparkContext parses the Applicaiton code, builds the DAG diagram, and presents it to the DAG Scheduler to decompose into stages (When Action is encountered, jobs are generated; Each Job contains one or more stages, which are generated before external numbers and shuffles are obtained.
  • The Stage (or TaskSet) is submitted to the Task Scheduler. Task Scheduler is responsible for the Task assigned to the corresponding Worker, finally submitted to StandaloneExecutorBackend execution;
  • Serialize tasks and allocate tasks based on the task allocation algorithm
  • Deserialize the received task and encapsulate it as a thread
  • Start executing the Task and report to SparkContext until the Task completes.
  • Resources logout

Graphical description of running process

Spark on YARN Operation process

YARN is a unified resource management mechanism on which multiple computing frameworks can be run. In the current world of big data technology, most companies use other computing frameworks such as MapReduce and Storm in addition to Spark for data calculation due to historical reasons or performance considerations of unilateral business processing. Based on this situation, Spark develops the Spark on YARN operating mode. The elastic resource management mechanism of YARN facilitates Application deployment and isolates the services running in the YARN cluster from Application resources. More practical, YARN can manage multiple services running in a cluster in queue mode.

Spark on YARN Mode The Driver is in yarn-client mode and yarn-cluster (yarn-standalone mode) mode based on the location of the Driver in the Cluster.