Design and operation principles of Spark

Operating architecture of Spark

The basic concept

  • RDD: Elastic distributed data set and immutable data set. An RDD includes operators, dependencies, partitions, partition list, and partition location
  • DAG: directed acyclic graph, reflecting the dependencies between RDD
  • Executor: A process that runs on a worker node and is responsible for running tasks and storing data for applications
  • Application: Spark Application written by a user
  • Task: Unit of work running on Excutor
  • Job: A Job contains multiple RDDS and the operations that operate on the corresponding RDDS. An action operator generates a Job
  • Stage: A Job can be divided into multiple stages based on width and width. A Stage contains multiple tasks

Architecture design

The Spark operating architecture consists of a Cluster Manager, a Worker Node for running job tasks, a Driver for each application, and an Excutor for each work Node.

Spark’s Excutor has the following features:

  • Use multithreading to perform specific tasks and reduce the cost of tasks
  • Excutor has a BlockManager storage module, which uses the memory and disks as storage devices. When Spark is running, intermediate results can be stored in this storage module. When needed, data in this storage module can be read directly instead of reading and writing data to file systems such as HDFS, reducing I/O overhead

In Spark, an Application consists of a Task node and several jobs. A Job consists of stages, and a phase consists of multiple tasks. When executing an application, the task control node requests resources from the Cluster Manager, starts Excutor, sends application code and files to Excutor, and then executes tasks on Excutor. When the execution is complete, the execution results are returned to the task control node. Or write it to a storage system such as HDFS

Spark runs basic processes

The basic operation process of Spark is as follows:

  • When a Spark application is submitted, you need to build a basic running environment for the application, that is, create a SparkContext by the task control node (Driver). SparkContext is responsible for the communication with the Cluster Manager, resource application, task allocation and monitoring, etc. SparkContext registers with the resource manager and requests resources to run Executor
  • The resource manager allocates resources to the Executor and starts the Executor process, and the Executor runtime is sent to the resource manager in a heartbeat
  • SparkContext constructs the DAG graph according to the RDD dependencies, which is submitted to the DAG scheduler for parsing. The DAG graph is decomposed into multiple “stages” (each stage is a task set), and the dependencies among each stage are calculated. The “task sets” are then submitted to the underlying TaskScheduler for processing; Executor requests tasks from SparkContext, the task scheduler assigns tasks to Executor to run, and SparkContext assigns application code to Executor
  • Tasks run on Executor, feed back the results of their execution to the task scheduler, which then feeds back to the DAG scheduler, which writes data and frees all resources

Design and operation principle of RDD

RDD concept

RDD provides an abstract data structure, we don’t have to worry about the distribution characteristic of the underlying data need to transform into a series of specific application logic expressions, the conversion between different RDD operation form dependencies, can realize the pipelining, avoiding the intermediate result of storage, greatly reduces the data replication, disk IO and serialization overhead.

A RDD is a collection of distributed object that is essentially a read-only partition record collection, each RDD may be divided into several partitions, each partition is a piece of data set, and a RDD different partitions can be saved to different nodes in the cluster, which can be performed at different nodes in the cluster parallel computing.

RDD provides a highly constrained shared memory model, that is, an RDD is a collection of read-only record partitions that cannot be modified directly, but can only be created from a set of data in stable physical storage or by performing certain transformation operations (such as map, Join, and groupBy) on other RDD’s.

RDD provides a rich set of operations to support common data operations, divided into “actions,” which perform calculations and specify the form of output, and “transformations,” which specify the interdependencies between RDD’s. The main difference between the two types of operations is that transformation operations (such as Map, filter, groupBy, join, etc.) accept RDD and return RDD, while action operations (such as count, Collect, etc.) accept RDD but return non-RDD (that is, output a value or result).

Lazy call RDD was adopted, namely in the execution process of RDD (as shown in figure 9 to 8), the real calculation in RDD “action” operation, for all “transformation” before the “action” operation, the Spark is recorded “transformation” operation application of some basic data set and RDD generated trajectory, namely the dependencies between each other, It doesn’t trigger the actual calculation.

The above series of processing is called a Lineage, that is, the result of DAG topological sequencing. With inert calls, a series of linked by blood relationship RDD operation can be achieved pipelining (pipeline), to avoid the conversion operation between the data synchronization of waiting, and don’t have to worry about too many intermediate data, because these has the blood relationship of pipelining, an operating results do not need to save for the intermediate data, Instead, it pipes directly into the next operation for processing. At the same time, this design method of pipelining a series of operations through blood relationship also makes the calculation of each operation in the pipeline relatively simple, and ensures the single processing logic of each operation. In contrast, MapReduce is designed to minimize MapReduce processes by writing too much complex logic into a single MapReduce.

RDD features

RDD in Spark provides efficient computing for the following reasons:

  • Efficient fault tolerance. Existing distributed shared memory, key-value storage, in-memory database, etc., must be replicated or logged between cluster nodes to achieve fault tolerance. In other words, a large number of data transfers will occur between nodes, which will bring a lot of overhead for data-intensive applications. In the design of RDD, the data is read-only and cannot be modified. If the data needs to be modified, it must be converted from the parent RDD to the child RDD, thus establishing the blood relationship between different RDD. RDD, therefore, is a kind of natural fault tolerance mechanism of special collections, don’t need (checkpoint, for example) by means of data redundancy to realize fault tolerance, and only the RDD relationships depend on the (blood) to calculate the lost partition to realize fault tolerance, don’t need to roll back the entire system, thus avoiding the data replication of high cost, Moreover, the recalculation process can be carried out in parallel between different nodes to achieve efficient fault tolerance. In addition, RDD provides coarse-grained transformation operations (such as map, filter, and Join), and RDD dependencies only need to log coarse-grained transformation operations, rather than logging specific data and various fine-grained operations (such as which data item was modified). This greatly reduces the fault-tolerant overhead in data-intensive applications;
  • Intermediate results are persisted to memory. Data is transferred between multiple RDD operations in memory without landing on the disk, avoiding unnecessary disk read and write overhead.
  • The stored data can be Java objects, avoiding unnecessary object serialization and deserialization overhead.

Dependencies between RDD

Different operations in the RDD will result in different dependencies for partitions in different RDD. RDD Dependency is divided into Narrow Dependency and Wide Dependency.

  • Narrow dependencies are represented as partitions of one parent RDD corresponding to partitions of one child RDD, or partitions of multiple parent RDD corresponding to partitions of one child RDD. That is, one father has only one child, and there is no rezoning.
  • The wide dependency shows that one partition of a parent RDD corresponds to multiple partitions of a child RDD. One father has more than one child

Collaborative division and non-collaborative division:

  • The collaborative partition of input belongs to narrow dependence. Co-partitioned means that all the “keys” of a partition of multiple parent RDD fall into the same partition of the child RDD, without producing a partition of the same parent RDD, but fall into two partitions of the child RDD.
  • Non-cooperative partition of input belongs to wide dependence.

For RDD with narrow dependencies, all parent partitions can be calculated in a pipelined manner without mixing data between networks. For RDD with wide dependence, Shuffle is usually performed. That is, all parent partition data needs to be calculated first and Shuffle is performed between nodes.

Spark’s dependency design makes it naturally fault-tolerant, greatly speeding up The execution of Spark. Because, RDD data set by “blood ties” remember how it evolved from other RDD, related record is the transformation behavior of coarse granularity, when the part of the RDD partition data loss, it can get enough information through related to operation and restore the lost data partition, the resulting performance improvement. In the two dependencies, narrow dependency failure recovery is more efficient, requiring only recalculation of the lost partitions against the parent RDD partition (not all partitions) and recalculation on different nodes in parallel. For wide dependencies, a single node failure usually means a recalculation process involving multiple parent RDD partitions, which is expensive. In addition, Spark provides data checkpoints and logging for persistent intermediate RDD, which eliminates the need to go back to the beginning of a failed recovery. During fault recovery, Spark compares the cost of data checkpoint with the cost of recalculating RDD partitions to automatically select an optimal recovery policy.

RDD running process

The running process of RDD in Spark is as follows:

  1. Create an RDD object
  2. SparkContext is responsible for calculating the dependencies between RDD and building a DAG
  3. DAGScheduler is responsible for breaking up the DAG diagram into stages, each of which contains multiple tasks, each of which is distributed by the task scheduler to executors on Worker nodes for execution

Spark Programming Basics

RDD programming

RDD operation

RDD has two operations:

  • Transform operation: Create a new dataset based on an existing dataset
    • Filter (func) : Filter out elements that satisfy function func and return a new dataset
    • Map (func) : Passes each element to the function func and returns the result as a new dataset
    • FlatMap (func) : Similar to map(), but each input element can map to zero or more output results, but the final results are put together
    • GroupByKey () : When applied to a dataset of (K,V) key-value pairs, returns a new (K, Iterable) dataset
    • ReduceByKey (FUNc) : When applied to a dataset of (K,V) key-value pairs, return a new (K,V) form of the dataset where each value is passed each key into the function func for aggregation
  • Action operation: Perform operations on a data set and return calculated values
    • Count () : Returns the number of elements in the dataset
    • Collect () : Returns all the elements in the data set as an array
    • First () : Returns the first element in the dataset
    • Take (n) : Returns the first n elements of the dataset as an array
    • Reduce (func): Aggregates elements of a dataset through the function func, which takes two arguments and returns a value
    • Foreach (func): Each element in the dataset is passed to the function func to run

persistence

RDD uses a lazy evaluation mechanism, where each action is evaluated from scratch. If you need to invoke a different action, each action will trigger a calculation from the beginning. This can be costly for iterative computation.

To avoid this overhead of double-counting through persistence (caching) mechanisms. You can use the persist() method to mark an RDD for persistence because the RDD is not computed and persisted immediately in places where the persist() statement occurs, but is not persisted until the first action triggers the actual calculation. The persisted RDD is retained in the memory of the compute node for reuse by subsequent operations. The parentheses of persist() contain persistent-level parameters, such as persist(MEMORY_ONLY), which means that the RDD is stored in the JVM as a deserialized object, and that if memory runs out, the contents of the cache are replaced by the LRU principle. Persist (MEMORY_AND_DISK) means that the RDD is stored in the JVM as a deserialized object, and if memory runs low, the excess partition will be stored on disk. Generally, when the cache() method is used, persist(MEMORY_ONLY) is called.

The persistent RDD can be manually removed from the cache using the unpersist() method.

Key/value pair RDD

Common conversion operations for key-value pairs:

  • ReduceByKey (FUNc) : The function of reduceByKey(func) is to combine values that have the same key using the FUNc function. For example, reduceByKey((a,b) => A + B), which has four key-value pairs (” Spark “,1), (” Spark “,2), (” Hadoop “,3) and (” hadoop “,5), the result of combining key-value pairs with the same key is as follows: (the “spark”, 3), (” hadoop, 8). It can be seen that (a,b) => a+b In the Lamda expression, both a and b refer to value. For example, for two key-value pairs with the same key (” spark “,1) and (” spark “,2), A is 1 and B is 2.
  • GroupByKey () : groupByKey() groups values that have the same key. For example, the four key/value pair (” spark “, 1), (” spark “, 2), (” hadoop “, 3) and (” hadoop “, 5), using groupByKey (after) the result is: (” spark “, (1, 2)) and (” hadoop “, (3, 5)).
  • Keys () : Keys only return the key value to the key in the RDD to form a new RDD. For example, for an RDD consisting of four key pairs (” spark “,1), (” spark “,2), (” hadoop “,3), and (” hadoop “,5), the result is an RDD[Int] with the contents {” spark “, “spark”, “hadoop”, “hadoop”,” Hadoop “}.
  • Values () : Values only returns the key value to the value in the RDD to form a new RDD. For example, for an RDD consisting of four key pairs (” Spark “,1), (” spark “,2), (” Hadoop “,3), and (” hadoop “,5), the result is an RDD[Int] with the contents {1,2,3,5}.
  • SortByKey () : sortByKey() returns an RDD sorted by key
  • MapValues (func) : This function applies a function to each value in the key-value pair RDD, but the key does not change. For example, for pairRDD consisting of four key pairs (” spark “,1), (” spark “,2), (” hadoop “,3), and (” hadoop “,5), if you execute pairrdd.mapvalues (x => x+1), you will get a new key value pairRDD. It includes the following four key/value pair (” spark “, 2), (” spark “, 3), (” hadoop “, 4) and (” hadoop “, 6).

Shared variables

By default, when Spark runs a function on multiple tasks on multiple nodes in a cluster in parallel, it generates a copy of each variable involved in the function on each task.

Broadcast variables are used to share variables between memory on all nodes. The accumulator supports cumulative calculations (such as counting or summing) between all the different nodes.

Radio variable

Broadcast variables allow application developers to cache a read-only variable on each machine, rather than generating a copy for each task on the machine. In this way, it is very efficient to provide each node (machine) with a copy of a large input data set. Spark’s Actions span multiple stages. Spark automatically broadcasts the public data required by all tasks in each stage. Variables propagated by broadcast are serialized and then deserialized when they are used by a task. This means that explicitly creating broadcast variables is only useful when tasks that span multiple phases require the same data, or when it is important to cache the data in a deserialized manner.

A broadcast variable can be created from a normal variable v by calling sparkContext.broadcast (v). The broadcast variable is a wrapper around the ordinary variable v, and the value of the broadcast variable can be obtained by calling the value method, for example

scala> val broadcastVar = sc.broadcast(Array(1.2.3))
broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
scala> broadcastVar.value
res0: Array[Int] = Array(1.2.3)
Copy the code

Once the broadcast variable is created, use the broadcastVar value instead of the v value in any function in the cluster so that v is not re-distributed to these nodes. In addition, once the broadcast variable is created, the value of the ordinary variable v cannot be changed to ensure that all nodes get the same value for the broadcast variable.

accumulator

An accumulator is a variable that is only accumulated by related operations and can often be used to implement a counter (counter) and a sum (sum).

A numeric type accumulator, by calling the SparkContext. LongAccumulator () or SparkContext. DoubleAccumulator () to create. Tasks running in the cluster can use the add method to add values to the accumulator. However, these tasks can only do the accumulative operation and cannot read the value of the accumulator. Only the Driver Program can use the value method to read the value of the accumulator.

Spark SQL

Difference between DataFrame and RDD

DataFrame enables Spark to process large-scale structured data, which is easier to use than the original RDD conversion method and achieves higher computing performance.

RDD is a collection of objects, but the RDD does not know the structure of the object. A DataFrame is an RDD-based distributed data set, that is, a collection of distributed Row objects (each Row object represents a Row of records) that provides detailed structural information, known as a schema. Spark SQL clearly knows which columns the data set contains, and the name and type of each column.

The DataFrame variation operations also use a lazy mechanism, recording only the logical transformation roadmap of the various transformations (a DAG diagram), and no real calculation takes place. The DAG diagram acts as a logical query plan, which is eventually translated into a physical query plan to generate the RDD DAG.

Spark Streaming

Flow calculation

The stream computing process includes real-time data acquisition, real-time data calculation and implementation of query services

  • Real-time data collection: The stage of real-time data collection usually collects massive data from multiple data sources, which needs to ensure real-time performance, low delay, stability and reliability.
  • Real-time data calculation: the stream processing system receives the real-time data constantly sent by the data acquisition system, carries out real-time analysis and calculation, and feeds back the real-time results.
  • Real-time query service: The third stage of stream computing is real-time query service. The results obtained through the stream computing framework can be queried, displayed or stored in real time by users.

SparkStreaming design

Spark Streaming the basic principle of Spark Streaming is to split real-time input data streams into time slices (in seconds), and then process each time slice through the Spark engine in a batch-like manner.

The main abstraction of Spark Streaming is DStream (Discretized Stream), which represents continuous data Stream. Internally, the Spark Streaming input data is divided into dStreams in time slices (for example, 1 second). Each data segment is converted to the RDD in Spark, and operations on the DStream are eventually converted to operations on the corresponding RDD.

Park Streaming cannot achieve millisecond stream calculations because it breaks the stream data into a series of batch jobs by batch window size (typically between 0.5 and 2 seconds), during which multiple Spark jobs are generated, In addition, each piece of data is processed through the Spark DAG graph decomposition and task scheduling process. Therefore, millisecond level correlation cannot be implemented. Spark Streaming is not suitable for scenarios with high real-time requirements (such as high-frequency real-time trading), but it is competent for other Streaming quasi-real-time computing scenarios.

How SparkStream works

In Spark Streaming, there is a component Receiver that runs on an Executor as a long-running task. Each Receiver is responsible for an Input DStream (such as a file stream that reads data from a file, such as a socket stream, or an input stream that reads data from a Kafka, etc.). Spark Streaming connects to external data sources through input DStream to read related data.

Basic steps for writing Spark Streaming program:

  1. The input source is defined by creating an input DStream
  2. Stream computation is defined by applying transformation operations and output operations to DStream.
  3. Use streamingContext.start() to start receiving data and processing.
  4. Through streamingContext. AwaitTermination () method to end waiting to be processed (manual or end because of the wrong end).
  5. The stream calculation process can be manually terminated by streamingContext.stop().

Spark Mlib

Machine learning Workflows

A few important concepts:

  • DataFrame: Use the DataFrame in Spark SQL as the data set, which can contain various data types. RDD contains schema information and is more similar to two-dimensional tables in traditional databases. It is used by ML Pipeline to store source data. For example, columns in a DataFrame can be stored text, feature vectors, real and predicted labels, and so on.
  • Transformer: An algorithm that converts one DataFrame into another. For example, a model is a Transformer. It can label a test data set DataFrame that does not contain a prediction tag and convert it into another DataFrame that contains a prediction tag. Technically, Transformer implements a method transform () that converts one DataFrame to another by attaching one or more columns.
  • Estimator: Translated as Estimator or Estimator, it is a conceptual abstraction of learning algorithms or training methods on training data. It is usually used in pipelines to manipulate DataFrame data and produce a Transformer. Technically, Estimator implements a method, FIT (), that takes a DataFrame and produces a converter. For example, a random forest algorithm is an Estimator, which can call FIT () to obtain a random forest model by training feature data.
  • Parameter: Parameter is used to set parameters for Transformer or Estimator. All converters and estimators now share a common API for specifying parameters. ParamMap is a set of (parameter, value) pairs.
  • PipeLine: Translated as work flow or PipeLine. Workflow links together multiple workflow stages (converters and estimators) to form a machine-learning workflow and get the result output.

To build a Pipeline workflow, each workflow phase of PipelineStage (including converters and evaluators) in a Pipeline needs to be defined, such as metric extraction and transformation model training.

val pipeline = new Pipeline().setStages(Array(stage1, stage2, stage3,...). )Copy the code

You can then start streaming the source training data by calling the Fit method of the Pipeline instance with the training data set as an input parameter. This call returns an instance of the PipelineModel class, which is used to predict the labels of the test data.

The phases of the workflow run sequentially, and the input DataFrame is converted as it passes through each phase. For the Transformer stage, the transform () method is called on the DataFrame. For the estimator phase, the FIT () method is called to generate a converter that becomes part of PipelineModel or the fitted Pipeline, and the transform () method of the converter is called on the DataFrame if there are other operations following the estimator.

Workflow itself can also be viewed as an estimator. After the workflow’s FIT () method runs, it produces a PipelineModel, which is a Transformer. This pipeline model will be used when testing data.

Reference: Spark Introduction to Big Data (Scala version)