“This is the 31st day of my participation in the November Gwen Challenge. See details of the event: The Last Gwen Challenge 2021”.
I. Overview of Spark
1.1. What is Spark
Spark is a memory-based fast, versatile, and scalable computing engine for big data analysis.
1.2、Spark and Hadoop
In previous studies, MapReduce of Hadoop is a widely known computing framework, so why do we need to learn Spark, the relationship between Spark and Hadoop has to be mentioned here.
First, from the time node:
-
Hadoop
- Doug Cutting joined Yahoo in January 2006 to lead the development of Hadoop
- In January 2008, Hadoop became an Apache top-level project
- 1.0 was officially released in 2011
- Stable version was released in March 2012
- 2.X (Yarn) version was released in October 2013
-
Spark
- Spark was born in 2009 at the AMPLab Laboratory at the University of Berkeley
- In 2010, Berkeley officially opened the Spark project
- In June 2013, Spark became a project under the Apache Foundation
- In February 2014, Spark quickly became Apache’s top project
- Since 2015, Spark has become increasingly popular, with a large number of Domestic companies focusing on deploying or using Spark
Then let’s look at the function:
-
Hadoop
- Hadoop is an open source framework written in Java language that stores massive data and runs distributed analysis applications on distributed server clusters
- As a Hadoop distributed file system, HDFS is the lowest layer of the Hadoop ecosystem, storing all data and supporting all Hadoop services. Its theoretical basis comes from TheGoogleFileSystem, which is an open source implementation of GFS.
- MapReduce is a programming model implemented by Hadoop according to MapReduce paper of Google. As a distributed computing model of Hadoop, MapReduce is the core of Hadoop. Based on this framework, it is extremely easy to write distributed parallel programs. By integrating distributed storage of HDFS and distributed computing of MapReduce, Hadoop makes it easy to scale performance when processing massive data.
- HBase is an open source implementation of Google’s Bigtable, but there are many differences between HBase and Bigtable. HBase is a distributed database based on the Hadoop Distributed File System (HDFS). It is good at randomly reading and writing large data sets in real time. It is also a very important component of Hadoop.
-
Spark
- Spark is a fast, versatile, and extensible big data analysis engine developed by Scala
- Spark Core provides the most basic and Core functions of Spark
- Spark SQL is used by Spark to operate structured data. Spark SQL allows users to query data using SQL or Apache Hive SQL dialect (HQL).
- Spark Streaming is a component of the Spark platform that performs Streaming computing for real-time data and provides rich apis for processing data streams.
As you can see from the above information, Spark is relatively new and mainly used for data calculation. Therefore, Spark has always been regarded as an upgraded version of Hadoop framework.
1.3、Spark or Hadoop
Hadoop’s MR framework and Spark framework are both data processing frameworks. How do we choose to use them?
-
Hadoop MapReduce is not designed to meet the requirements of cyclic iterative data flow processing, so it has many problems such as computational efficiency in multi-parallel data reuse scenarios (such as machine learning, graph mining algorithm, and interactive data mining algorithm). Therefore, Spark comes into being. Based on the traditional MapReduce computing framework, Spark uses the optimization of its calculation process to greatly speed up data analysis, mining, operation and read/write speed, and reduce the computing unit to a RDD computing model suitable for parallel computing and reuse.
-
Machine learning ALS, convex optimization gradient descent, etc. All of these require repeated queries and manipulations based on data sets or derived data sets. MR mode is not suitable, even with multiple MR serial processing, performance and time is an issue. Data sharing depends on disk. The other is interactive data mining, which MR is clearly not good at. The Scala language on which Spark is based is good at handling functions.
-
Spark is a distributed data rapid analysis project. Its core technology is Resilient Distributed Datasets (Resilient Distributed Datasets), which provides a richer model than MapReduce. It can quickly carry out multiple iterations of the data set in memory to support complex data mining algorithms and graph computing algorithms.
-
The fundamental difference between Spark and Hadoop is data communication between multiple jobs: Spark communicates data between multiple jobs based on memory, whereas Hadoop communicates data based on disk.
-
The Spark Task starts quickly. Spark forks threads, while Hadoop creates new processes.
-
Spark writes data to disks only in shuffle mode. However, data interaction between multiple MR jobs in Hadoop depends on disk interaction
-
The Spark cache mechanism is more efficient than the HDFS cache mechanism.
Based on the above comparison, Spark does have an advantage over MapReduce in most data computing scenarios. However, Spark is based on memory. In actual production environments, Job execution may fail due to insufficient memory. In this case, MapReduce is a better choice and Spark cannot completely replace MR.
1.4. Spark core module
-
Spark Core
Spark Core provides the most basic and Core functions of Spark. Spark SQL, Spark Streaming, GraphX, and MLlib are all extended based on Spark Core
-
Spark SQL
Spark SQL is used by Spark to operate structured data. Spark SQL allows users to query data using SQL or Apache Hive SQL dialect (HQL).
-
Spark Streaming
Spark Streaming is a component of the Spark platform that performs Streaming computing for real-time data and provides rich apis for processing data streams.
-
Spark MLlib
MLlib is a machine learning algorithm library provided by Spark. MLlib not only provides additional capabilities such as model evaluation and data import, but also provides some lower-level machine learning primitives.
-
Spark GraphX
GraphX is Spark’s graph-oriented computing framework and library.
2. Start Spark quickly
2.1. Create a Maven project
2.1.1. Add Scala plug-ins
Spark is developed by Scala, and the next development will use Scala. The current version of Spark is 3.0.0, and the default compiled version of Scala is 2.12. We’re still using this version. Before development, ensure that the IDEA development tool contains the Scala development plug-in
2.1.2. Increase dependencies
Modify the POM file in the Maven project to add the dependency of the Spark framework. Spark3.0 is used. Pay attention to the corresponding version.
< dependencies > < the dependency > < groupId > org. Apache. The spark < / groupId > < artifactId > spark - core_2. 12 < / artifactId > The < version > 3.0.0 < / version > < / dependency > < / dependencies >Copy the code
2.1.3, WordCount
To get a feel for the Spark framework, let’s implement WordCount, one of the most common examples of big data
// Create Spark to run configuration objects
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("WordCount")
// Create Spark context objects (connection objects)
val sc : SparkContext = new SparkContext(sparkConf)
// Read file data
val fileRDD: RDD[String] = sc.textFile("input/word.txt")
// Split the data in the file
val wordRDD: RDD[String] = fileRDD.flatMap( _.split(""))// Convert data structure word => (word, 1)
val word2OneRDD: RDD[(String.Int)] = wordRDD.map((_,1))
// Group and aggregate the transformed data into the same words
val word2CountRDD: RDD[(String.Int)] = word2OneRDD.reduceByKey(_+_)
// Collect data aggregation results into memory
val word2Count: Array[(String.Int)] = word2CountRDD.collect()
// Print the result
word2Count.foreach(println)
// Close Spark connection
sc.stop()
Copy the code
Log4j. properties file in the resources directory of your project and add the log configuration information:
log4j.rootCategory=ERROR, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
# Set the default spark-shell log level to ERROR. When running the spark-shell, the
# log level for this class is used to overwrite the root logger's log level, so that
# the user can have different defaults for the shell and regular Spark apps.
log4j.logger.org.apache.spark.repl.Main=ERROR
# Settings to quiet third party logs that are too verbose
log4j.logger.org.spark_project.jetty=ERROR
log4j.logger.org.spark_project.jetty.util.component.AbstractLifeCycle=ERROR
log4j.logger.org.apache.spark.repl.SparkIMain$exprTyper=ERROR
log4j.logger.org.apache.spark.repl.SparkILoop$SparkILoopInterpreter=ERROR
log4j.logger.org.apache.parquet=ERROR
log4j.logger.parquet=ERROR
# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
log4j.logger.org.apache.hadoop.hive.metastore.RetryingHMSHandler=FATAL
log4j.logger.org.apache.hadoop.hive.ql.exec.FunctionRegistry=ERROR
Copy the code
3. Spark operating environment
Spark, as a data processing framework and computing engine, is designed to run in all common cluster environments. The main working environment in China is Yarn, but the container environment is gradually becoming popular. Next, let’s take a look at how Spark works in different environments.
3.1 Local mode
The Local mode is an environment where Spark code can be executed locally without any other node resources. It is generally used for teaching, debugging, and demonstration. The environment where the code is run in IDEA is called the development environment, which is different.
3.1.1. Decompress the file
Upload the spark-3.0.0-bin-hadoop3.2. TGZ file to Linux, decompress it, and save the file in the specified location. The path does not contain Chinese characters or Spaces.
Tar -zxvf spark-3.0.0-bin-hadoop3.2. TGZ -c /opt/module CD /opt/module mv spark-3.0.0-bin-hadoop3.2 spark-localCopy the code
3.1.2. Enable the Local environment
-
Go to the decompressed path and run the following command
bin/spark-shell Copy the code
-
After the system is successfully started, you can enter the url to access the Web UI monitoring page
http://VM ADDRESS :4040Copy the code
3.1.3 Command Line Tools
Add the word. TXT file to the data directory in the decompressed folder. Execute the following code instruction in the command line tool (same as the code simplified version of IDEA)
sc.textFile("data/word.txt").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collect
Copy the code
3.1.4 Exit the local mode
Press Ctrl+C or enter the Scala instruction
:quit
Copy the code
3.1.5. Submit the application
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master local[2] \ . / examples/jars/spark - examples_2. 12-3.0.0. Jar \ 10Copy the code
- –class indicates the main class of the program to be executed
- –master local[2] The deployment mode is local by default. The number indicates the number of virtual CPU cores allocated
- Spark-examples_2.12-3.0.0. jar Specifies the JAR package of the running application class. In practice, you can set the jar package to your own
- The number 10 represents the entry parameter of the program, which is used to set the number of tasks currently applied
3.2 Standalone Mode
The Standalone mode is the Standalone mode, which uses only Spark’s own nodes to run the application. Spark’s Standalone mode embodies the classic master-slave mode.
Cluster planning:
hadoop2 | hadoop3 | hadoop4 | |
---|---|---|---|
Spark | Worker Master | Worker | Worker |
3.2.1. Decompress the file
Upload the spark-3.0.0-bin-hadoop3.2. TGZ file to Linux and decompress it to the specified location
Tar -zxvf spark-3.0.0-bin-hadoop3.2. TGZ -c /opt/module CD /opt/module mv spark-3.0.0-bin-hadoop3.2 spark-standaloneCopy the code
3.2.2 modifying the configuration file
-
Go to the conf directory of the decompressed path and change the name of slaves. Template file to slaves
mv slaves.template slaves Copy the code
-
Modify slaves file and add the Work node
hadoop102 hadoop103 hadoop104 Copy the code
-
Example Change the name of the spark-env.sh.template file to spark-env.sh
mv spark-env.sh.template spark-env.sh Copy the code
-
Modify the spark-env.sh file to add the JAVA_HOME environment variable and the master node corresponding to the cluster
exportJAVA_HOME = / opt/module/jdk1.8.0 _212 SPARK_MASTER_HOST = hadoop102 SPARK_MASTER_PORT = 8020Copy the code
-
Distribute the spark-standalone directory
xsync spark-standalone Copy the code
3.2.3. Start the cluster
-
Execute the script command:
sbin/start-all.sh Copy the code
-
View the running processes of the three servers
-
View the Master resource monitoring Web UI at http://hadoop102:8080
3.2.4. Submit the application
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://hadoop102:8020 \ . / examples/jars/spark - examples_2. 12-3.0.0. Jar \ 10Copy the code
By default, the total number of cores on nodes in a server cluster is used. Each node has a memory of 1024 MB.
3.2.5 Description of submission parameters
In submission applications, there are usually several submission parameters
bin/spark-submit \
--class <main-class>
--master <master-url> \
... # other options
<application-jar> \
[application-arguments]
Copy the code
3.2.6. Configure the history service
After spark-shell is stopped, the running status of historical tasks cannot be seen on the Hadoop :4040 cluster monitoring page. Therefore, a historical server is configured to record the running status of historical tasks during development.
-
Change the name of the spark-defaults.conf.template file to spark-defaults.conf
mv spark-defaults.conf.template spark-defaults.conf Copy the code
-
Modify the spark-default.conf file and configure the log storage path
spark.eventLog.enabled true spark.eventLog.dir hdfs://hadoop102:8020/directory Copy the code
Note: To start the Hadoop cluster, the directory on the HDFS must exist in advance.
sbin/start-dfs.sh hadoop fs -mkdir /directory Copy the code
-
Modify the spark-env.sh file to add log configuration
export SPARK_HISTORY_OPTS=" -Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory -Dspark.history.retainedApplications=30" Copy the code
- The meaning of parameter 1 is as follows: The port number for accessing the WEB UI is 18080
- The meaning of parameter 2 is as follows: Specify the path for storing historical server logs
- Parameter 3 specifies the number of applications to save. If this value is exceeded, the old Application information will be deleted. This is the number of applications in memory, not the number of applications displayed on the page.
-
Distributing configuration files
xsync conf Copy the code
-
Restart the cluster and history services
sbin/start-all.sh sbin/start-history-server.sh Copy the code
-
Retask
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://hadoop102:8020 \ . / examples/jars/spark - examples_2. 12-3.0.0. Jar \ 10Copy the code
-
Check out the history service: http://hadoop102:18080
Note: port 8020 is changed to 7077, because hadoop HDFS default communication port 8020, to prevent port conflicts!!
3.2.7 Configuring HA
The so-called high availability is because there is only one Master node in the cluster, so there is a single point of failure. To solve the single point of failure problem, configure multiple Master nodes in a cluster. Once the active Master node fails, the standby Master node provides services to ensure that jobs can continue. Zookeeper Settings are used for high availability.
Cluster planning:
hadoop102 | hadoop103 | hadoop104 | |
---|---|---|---|
Spark | Master Zookeeper Worker | Master Zookeeper Worker | Zookeeper Worker |
-
Stop the cluster
sbin/stop-all.sh sbin/stop-history-server.sh Copy the code
-
Start the Zookeeper
[moe@hadoop102 spark-standalone]$ zk.sh start Copy the code
-
Modify the spark-env.sh file to add the following configuration
Note the following:#SPARK_MASTER_HOST=hadoop102 #SPARK_MASTER_PORT=7077 The default access port for the Master monitor page is 8080, but it may conflict with Zookeeper, so you can change it to 8989 SPARK_MASTER_WEBUI_PORT=8989 export SPARK_DAEMON_JAVA_OPTS=" -Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop102,hadoop103,hadoop104 -Dspark.deploy.zookeeper.dir=/spark" Copy the code
-
Distributing configuration files
[moe@hadoop102 spark-standalone]$ xsync conf/ Copy the code
-
Start the cluster
sbin/start-all.sh sbin/start-history-server.sh Copy the code
-
Start the single Master node of Hadoop103. The Master state of Hadoop103 is standby
[moe@hadoop103 spark-standalone]$ sbin/start-master.sh Copy the code
-
Submit the application to a high availability cluster
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://hadoop102:7077,hadoop103:7077 \ . / examples/jars/spark - examples_2. 12-3.0.0. Jar \ 10Copy the code
-
Stop the Master resource monitoring process for hadoop102
[moe@hadoop102 spark-standalone]$ kill7479-9Copy the code
-
View the Master resource monitoring Web UI of Hadoop103. After a period of time, the Master status of Hadoop103 becomes active
3.3. Yarn Mode
In Standalone mode, Spark provides computing resources on its own, without requiring resources from other frameworks. This approach reduces the coupling with other third-party resource frameworks and is highly independent. However, keep in mind that Spark is mainly a computing framework, not a resource scheduling framework. Therefore, it is not its strong point to provide resource scheduling by itself. Therefore, it is better to integrate with other professional resource scheduling frameworks. So let’s learn how Spark works in a strong Yarn environment (actually, because in domestic work, Yarn is used a lot).
3.3.1. Decompress the file
Tar -zxvf spark-3.0.0-bin-hadoop3.2. TGZ -c /opt/module CD /opt/module mv spark-3.0.0-bin-hadoop3.2 spark-yarnCopy the code
3.3.2 Modifying the configuration file
-
Modify the hadoop configuration file/opt/module/hadoop/etc/hadoop/yarn – site. XML
<! Whether to start a thread to check the amount of physical memory each task is using, and if the task exceeds the allocated value, it will be killed. Default is true --> <property> <name>yarn.nodemanager.pmem-check-enabled</name> <value>false</value> </property> <! Whether to start a thread to check the amount of virtual memory each task is using, and if the task exceeds the allocated value, it will be killed. Default is true --> <property> <name>yarn.nodemanager.vmem-check-enabled</name> <value>false</value> </property> Copy the code
Note: Distribution
-
Modify conf/spark-env.sh to add the JAVA_HOME and YARN_CONF_DIR configurations
mv spark-env.sh.template spark-env.sh exportJAVA_HOME = / opt/module/jdk1.8.0 _212 YARN_CONF_DIR = / opt/module/hadoop - 3.1.3 / etc/hadoopCopy the code
3.3.3. Start the HDFS and YARN cluster
[moe@hadoop102 hadoop]$ myhadoop.sh start
Copy the code
3.3.4. Submit the application
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode cluster \ . / examples/jars/spark - examples_2. 12-3.0.0. Jar \ 10Copy the code
To see the http://hadoop103:8088 page, click on History to see the History page
3.3.5. Configure the history Server
-
Change the name of the spark-defaults.conf.template file to spark-defaults.conf
mv spark-defaults.conf.template spark-defaults.conf Copy the code
-
Modify the spark-default.conf file and configure the log storage path
spark.eventLog.enabled true spark.eventLog.dir hdfs://hadoop102:8020/directory Copy the code
-
Modify the spark-env.sh file to add log configuration
export SPARK_HISTORY_OPTS=" -Dspark.history.ui.port=18080 -Dspark.history.fs.logDirectory=hdfs://hadoop102:8020/directory -Dspark.history.retainedApplications=30" Copy the code
-
Modify the spark – defaults. Conf
spark.yarn.historyServer.address=hadoop102:18080 spark.history.ui.port=18080 Copy the code
-
Start the History service
sbin/start-history-server.sh Copy the code
If an error is reported, create a directory!!
-
Resubmitting an application
bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master yarn \ --deploy-mode client \ . / examples/jars/spark - examples_2. 12-3.0.0. Jar \ 10Copy the code
-
To view logs on the Web page, go to http://hadoop102:8088
3.4 K8S & Mesos mode
Mesos is an open source distributed resource management framework under Apache, known as the kernel of distributed system. It is widely used in Twitter and manages the application deployment on more than 300,000 Twitter servers. However, in China, The traditional Hadoop big data framework is still used, so there are not many domestic Mesos framework, but the principles are similar, we will not explain too much here.
Container deployment is a very popular technology in the industry at present. Docker image based operation can make it more convenient for users to manage, operate and maintain applications. The most popular container management tool is Kubernetes (K8S), and Spark has supported the K8S deployment mode in recent releases. We’re not going to go into too much detail here. Give a link to your own feelings once: spark.apache.org/docs/latest…
3.5. Windows Mode
Their learning, every time need to start the virtual machine, start the cluster, this is a more complicated process, and will account for a large number of system resources, lead to slow system implementation, not only affect the study effect, also affect the learning progress, very warm heart provides the Spark can start a local cluster under the Windows system, in this way, Learn the basics of Spark without using a virtual machine. Touch!
3.5.1 Decompress the file
Decompress the spark-3.0.0-bin-hadoop3.2. TGZ file to a path without Chinese characters and Spaces
3.5.2 start the local environment
-
Run the spark-shell. CMD file in the bin directory to start the Spark local environment
-
Create the input directory in the bin directory, add the word. TXT file, and enter the script code on the command line
sc.textFile("input/word.txt").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).collect Copy the code
3.5.3 Submitting applications on the CLI
spark-submit --class org.apache.spark.examples.SparkPi --master local[2] ../examples/jars/spark-examples_2.12-3.0.0.jar 10
Copy the code
3.6 comparison of deployment modes
3.7. Port number
-
Spark View the current spark-shell running tasks port number: 4040 (computing)
-
Spark Master Internal communication service port number: 7077
-
Standalone mode, Spark Master Web port number: 8080 (Resource)
-
Spark Historical server port number: 18080
-
Hadoop YARN Task Running Status Port number: 8088