Implementation 1: Spark Shell
Primarily used for testing, you use the collection of test data to test the flow before deploying to the cluster.
1.1 Uploading Files to HDFS
First, upload the text file to the Spark directory on the HDFS.
[root@s166 fantj]# cat spark.txt
What is “version control”, and why should you care? Version control is a system that records changes to a file or set of files over time so that you can recall specific versions later. For the examples in this book, you will use software source code as the files being version controlled, though in reality you can do this with nearly any type of file on a computer.
If you are a graphic or web designer and want to keep every version of an image or layout (which you would most certainly want to), a Version Control System (VCS) is a very wise thing to use. It allows you to revert selected files back to a previous state, revert the entire project back to a previous state, compare changes over time, see who last modified something that might be causing a problem, who introduced an issue and when, and more. Using a VCS also generally means that if you screw things up or lose files, you can easily recover. In addition, you get all this forVery little overhead. Local Version Control Systems Many people's version-control method of choice is to copy files into another directory (perhaps a time-stamped directory,ifThey're clever. This approach is very common because it is so simple, but it is also incredibly error prone. It is easy to forgetwhichThe directory you 'reinAnd accidentally write to the wrong file or copy over files you don't mean to. To deal with this issue, programmers long ago developedlocal VCSs that had a simple database that kept all the changes to files under revision control.
Copy the code
[root@s166 fantj]# vim spark.txt
[root@s166 fantj]# hadoop fs -mkdir -p /spark
[root@s166 fantj]# hadoop fs -put spark.txt /spark
[root@s166 fantj]# hadoop fs -ls -R /spark
-rw-r--r-- 3 root supergroup 1527 2018-07-30 23:12 /spark/spark.txt
Copy the code
1.2 open the shell
[root@s166 fantj]# spark-shell
18/07/31 04:53:52 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/07/31 04:53:55 INFO spark.SecurityManager: Changing view acls to: root
18/07/31 04:53:55 INFO spark.SecurityManager: Changing modify acls to: root
18/07/31 04:53:55 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
18/07/31 04:53:58 INFO spark.HttpServer: Starting HTTP Server
18/07/31 04:53:59 INFO server.Server: jetty-8.y.z-SNAPSHOT
18/07/31 04:53:59 INFO server.AbstractConnector: Started [email protected]:36422
18/07/31 04:53:59 INFO util.Utils: Successfully started service 'HTTP class server' on port 36422.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_ / / ___ / __ / \ _, _ / _ / _ \ _ \ version 1.5.1 / _ /... . 18/07/31 04:57:28 INFO session.SessionState: Created HDFS directory: /tmp/hive/root/6814e7a5-b896-49ac-bcd8-0b94e1a4b165/_tmp_space.db 18/07/31 04:57:30 INFO repl.SparkILoop: Created sql context (with Hive support).. SQL context available as sqlContext. scala>Copy the code
1.3 Running the Scala program
sc.textFile("/spark/spark.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/spark/out")
Sc is the SparkContext object, which is the entry to submit the Spark program textFile("/spark/spark.txt"FlatMap (_.split("")) Map and then flattening Map ((_,1)) Form words and 1 into tuples reduceByKey(_+_) Reduce by key and add value saveAsTextFile("/spark/out") writes the result to HDFSCopy the code
scala> sc.textFile("/spark/spark.txt").flatMap(_.split("")).map((_,1)).reduceByKey(_+_).saveAsTextFile("/spark/out")
18/07/31 04:59:40 INFO storage.MemoryStore: ensureFreeSpace(57160) called with curMem=0, maxMem=560497950
18/07/31 04:59:40 INFO storage.MemoryStore: Block broadcast_0 stored as values inMemory (Estimated Size 55.8 KB, free 534.5 MB) 18/07/31 04:59:44 INFO Storage.MemoryStore: ensureFreeSpace(17347) called with curMem=57160, maxMem=560497950 18/07/31 04:59:44 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytesinThe memory (estimated size 16.9 KB, 534.5 MB) free 18/07/31 04:59:44 INFO storage. BlockManagerInfo: Added broadcast_0_piece0inMemory on localhost:37951 (size: 16.9KB, free: 534.5MB) 18/07/31 04:59:44 INFO Spark. SparkContext: Created broadcast 0 from textFile at <console>:22 18/07/31 04:59:49 INFO mapred.FileInputFormat: Total input paths to process : 1 18/07/31 04:59:53 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id 18/07/31 04:59:53 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id 18/07/31 04:59:53 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap 18/07/31 04:59:53 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition 18/07/31 04:59:53 INFO Configuration.deprecation: Mapred.job. id is deprecated. Instead, use mapreduce.job.id 18/07/31 05:00:51 INFO ExecutorinStage 1.0 (dar). 1165 bytes result sent to driver 18/07/31 05:00:51 INFO scheduler. TaskSetManager: Finished task 0.0inStage 1.0 (dar (1)in4730 ms on localhost (1/1) 18/07/31 05:00:51 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 18/07/31 05:00:51 INFO ResultStage 1 (saveAsTextFile at <console>:22) finishedinDAGScheduler: Job 0 Finished: saveAsTextFile at <console>:22, took 15.399221sCopy the code
1.4 Viewing the execution result
[root@s166 ~]# hadoop fs -cat /spark/out/p* (simple,,1) (nearly,1) (For,1) (back,2) (this,4) (under,1) (it,2) (means,1) (introduced,1) (revision,1) (when,,1) ... . (To,1) ((which,1) ... (prone., 1) (an, 2) (time, 1) (things, 1) (they 're, 1)... (might,1) (would,1) (issue,,1) (state,,2) (Systems,1) (System,1) (write,1) (being,1) (programmers,1)Copy the code
Implementation two: Java executes processing locally
Mainly used for temporary treatment.
pom.xml
<? xml version="1.0" encoding="UTF-8"? > <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> < modelVersion > 4.0.0 < / modelVersion > < groupId > com. Fantj < / groupId > < artifactId > bigdata < / artifactId > <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name> Bigdata </name> <description>Demo projectforSpring Boot</description> <parent> <groupId>org.springframework.boot</groupId> The < artifactId > spring - the boot - starter - parent < / artifactId > < version > 2.0.4. RELEASE < / version > < relativePath / > <! -- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> < project. Reporting. OutputEncoding > utf-8 < / project. Reporting. OutputEncoding > < Java version > 1.8 < / Java version > </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </dependency> <! --<dependency>--> <! --<groupId>org.apache.hive</groupId>--> <! --<artifactId>hive-jdbc</artifactId>--> <! - < version > 2.1.0 < / version > -- > <! --</dependency>--> <! -- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <! -- https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-common --> <dependency> <groupId>ch.cern.hadoop</groupId> The < artifactId > hadoop - common < / artifactId > < version > 2.7.5.1 < / version > < classifier > sources < / classifier > <type>java-source</type> </dependency> <dependency> <groupId> AI.H2O </groupId> <artifactId>sparkling-water-core_2.10</artifactId> The < version > 1.6.1 < / version > <type>pom</type> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> <! > > <exclusions> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-tomcat</artifactId> </exclusion> </exclusions> </dependency> <! Javax. servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency> </dependencies> <build> <finalName> Wordcount </finalName> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-jar-plugin</artifactId> < version > 2.4 < / version > < configuration > < archive > < manifest > < mainClass > com. Fantj. Bigdata. WordCountCluster < / mainClass > </manifest> </archive> </configuration> </plugin> </plugins> </build> </project>Copy the code
I use Springboot to build the environment, so we need to remove springboot built-in Tomcat in POM, we do not need a container to execute Java scripts. Finally, the jar package will tell Hadoop the path of main method, no need for container. The next step is to import the hadoop Spark dependencies. Learn Maven first if you don’t have maven basics. WordCountLocal
package com.fantj.bigdata; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; import java.util.Arrays; /** * Created by Fant.J. */ public class WordCountLocal { public static void main(String[] args) { /** * Create a sparkConfig object and set the Spark application configuration information.setMaster Sets the URL of the Master node in the Spark cluster that the Spark application connects to.local/ SparkConf conf = new SparkConf().setAppName("WordCountLocal").setMaster("local"); /** * Create JavaSparkContext object (the most important object) ** / JavaSparkContext sc = new JavaSparkContext(conf); The textFile () method is used to create an RDD based on the input source of the file type. If it is HDFS or a local file, each element of the RDD created is equivalent to a line in the file. */ JavaRDD<String> lines = sc.textFile("C:\\Users\\84407\\Desktop\\spark.txt"); /** * Perform transformation to the initial RDDfunctionAnd with RDD map, flatMap and other operators to execute *functionTypically, if it's simpler, you create the specifiedfunctionAnonymous inner class * iffunctionMore complex, a separate class is created to implement thisfunctionInterface classes * now split each line into a single word * FlatMapFunction, there are two generic statements, respectively representing the input and output types * here only the function of FLatMap operator, is actually talking about one element of RDD, split into one or more elements. */ JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {@override public Iterable<String> Call (String s) throws Exception {// Splitreturn Arrays.asList(s.split("")); }}); /** * Next, you need to add up the occurrences of each word by mapping each word to a (word, 1) format * mapTopair is essentially intervening each element into a Tuple2 element (v1,v2) * If you remember scala tuple, So yes, tuple2 is a Scala type that contains two values * mapToPair operator, required to be used in conjunction with PairFunction, the first generic parameter representing the input type * the second and third generic parameter, Represents the type of the first and second values of the output of Tuple2 * JavaPairRdd's two generic parameters, JavaPairRDD<String,Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String word) throws Exception {returnnew Tuple2<String,Integer>(word,1); }}); /** * Then we need to use words as keys and count the number of occurrences of each word * Here we need to use reduceBykey operator to reduce the value corresponding to each key * For example, there are several elements in JavaPairRDD, assuming that they are (Hello, 1) (hello, 1) (hello, 1) * reduce is equivalent to evaluating the first value with the second value, and then evaluating the result with the third to * for example, helo, so this is equivalent to 1+1=2, Then 2+1=3 * returns the element in JavaPairRdd, which is also a tuple, but the first value is each key and the second value is the value of the key. JavaPairRDD<String,Integer> wordCounts = mother.reduceByKey (new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception {returnv1+v2; }}); /** * We already counted the number of words * however, we used flatMap, mapToPair, reduceByKey, which are all called transformation * In a Spark application, Transformation alone won't work, */ wordCounts. Foreach (new VoidFunction<Tuple2<String, Integer>>() { @Override public void call(Tuple2<String, Integer> wordCount) throws Exception { System.out.println(wordCount._1 +"appeared "+ wordCount._2 ); }}); }}Copy the code
We can see that there are many anonymous inner classes, which we can replace with lambda to make the code more concise.
public static void main(String[] args) {
SparkConf conf = new SparkConf().setAppName("WordCountLocal").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("C:\\Users\\84407\\Desktop\\spark.txt"); JavaRDD<String> words = lines.flatMap((FlatMapFunction<String, String>) s -> {// splitreturn Arrays.asList(s.split(""));
});
JavaPairRDD<String, Integer> pairs = words.mapToPair((PairFunction<String, String, Integer>) word -> new Tuple2<>(word, 1));
JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey((Integer v1, Integer v2) -> {
return v1 + v2;
});
wordCounts.foreach((Tuple2<String, Integer> wordCount) -> {
System.out.println(wordCount._1 + "appeared " + wordCount._2);
});
}
Copy the code
Then execute the main method:
Console printing: Systemsappeared 1 examplesappeared 1 withappeared 2 inappeared 3 specificappeared 1 versionsappeared 1 recallappeared 1 copyappeared 2 Inappeared 1 VCSsappeared 1 controlled,appeared 1 Whatappeared 1 directory,appeared 1 Manyappeared 1 setappeared 1 loseappeared 1 ... . systemappeared 1 Systemappeared 1 writeappeared 1 beingappeared 1 programmersappeared 1Copy the code
Implementation three: Cluster execution
Most commonly used, it can perform offline batch processing for big data stored in the HDFS.
Preparations: Upload the spark. TXT file to the HDFS.
3.1 Modifying The Code
If you want to execute on a cluster, you need to change the code in two places:
SparkConf conf = new SparkConf().setAppName("WordCountCluster");
JavaSparkContext sc = new JavaSparkContext(conf);
JavaRDD<String> lines = sc.textFile("hdfs://s166/spark/spark.txt");
Copy the code
SetAppName is the same as the Java class name. Then change the path to the HDFS file path.
3.2 the Maven packaging
The Java projects of the second implementation need to be packaged into JARS, placed in a cluster and executed by script.
3.3 Uploading data to a Cluster
3.4 Writing an Execution Scriptwordcount.sh
[root@s166 fantj]# cat wordcount.sh
/home/fantj/spark/bin/spark-submit \
--class com.fantj.bigdata.WordCountCluster \
s--num-executors 1 \
--driver-memory 100m \
--executor-cores 1 \
/home/fantj/worldcount.jar \
Copy the code
3.5 Executing scripts
./wordcount.sh
[root@s166 fantj]# ./wordcount.sh
18/07/31 09:43:49 INFO spark.SparkContext: Running Spark version 1.5.1
18/07/31 09:43:51 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/07/31 09:43:52 INFO spark.SecurityManager: Changing view acls to: root
18/07/31 09:43:52 INFO spark.SecurityManager: Changing modify acls to: root
18/07/31 09:43:52 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
18/07/31 09:43:54 INFO slf4j.Slf4jLogger: Slf4jLogger started
18/07/31 09:43:54 INFO Remoting: Starting remoting
18/07/31 09:43:55 INFO util.Utils: Successfully started service 'sparkDriver' on port 41710.
18/07/31 09:43:55 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:41710]
18/07/31 09:43:55 INFO spark.SparkEnv: Registering MapOutputTracker
18/07/31 09:43:55 INFO spark.SparkEnv: Registering BlockManagerMaster
18/07/31 09:43:55 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-96c433c9-8f43-40fa-ba4f-1dc888608140
18/07/31 09:43:55 INFO storage.MemoryStore: MemoryStore started with capacity 52.2 MB
18/07/31 09:43:55 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-5d613c5d-e9c3-416f-8d8b-d87bc5e03e02/httpd-3609b712-55f4-4140-9e05-2ecee834b18c 18/07/31 09:43:55 INFO spark.HttpServer: Starting HTTP Server .. . 18/07/31 09:44:12 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetchesin4 ms simple,appeared 1 nearlyappeared 1 Forappeared 1 backappeared 2 thisappeared 4 underappeared 1 itappeared 2 meansappeared 1 introducedappeared 1 revisionappeared 1 when,appeared 1 previousappeared 2 realityappeared 1 typeappeared 1 developedappeared 1 Localappeared 1 simpleappeared 1 ... causingappeared 1 changesappeared 3 andappeared 5 designerappeared 1 approachappeared 1 modifiedappeared 1 systemappeared 1 Systemappeared 1 writeappeared 1 beingappeared 1 programmersappeared 1 18/07/31 09:44:12 INFO Executor. Executor: Finished task 0.0inStage 1.0 (dar). 1165 bytes result sent to driver 18/07/31 09:44:12 INFO scheduler. TaskSetManager: Finished task 0.0inStage 1.0 (dar (1)in200 ms on localhost (1/1) 18/07/31 09:44:12 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 18/07/31 09:44:12 INFO Scheduler: ResultStage 1 (foreach at WordCountCluster.java:44) finishedin0.209s 18/07/31 09:44:12 INFO Scheduler.DAGScheduler: Job 0 finished: Foreach at WordCountCluster. Java: 44, took 2.938418 s 18/07/31 09:44:12 INFO spark. SparkContext: Invoking stop() from shutdown hook ... . 18/07/31 09:44:13 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
Copy the code