A, scala
package com.chb.scala import org.apache.spark.SparkConf import org.apache.spark.SparkContext import org.apache.spark.storage.StorageLevel /** * */ object WordCount { def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("wordcount").setMaster("local"); val sc = new SparkContext(conf); Val lines = sc. TextFile (" HDFS: / / 192.168.1.224:9000 / user/root/ABC. Log ", 1); val words = lines.flatMap { line => line.split(" ") }; val pairs = words.map { word => (word, 1) }; val wcs = pairs.reduceByKey(_+_); val sortWCs = wcs.map(x=>(x._2,x._1)).sortByKey().map(x=>(x._2,x._1)); sortWCs.foreach(x=>println(x._1+" appears " + x._2 + "times")); }}Copy the code
Second, the scala version conflict problem occurs
The version of scala library found in The build path of TestSpark (2.10.5) is prior to The one provided by The Scala IDE Setting a Scala Installation Choice to match. TestSpark Unknown Scala Version ProblemCopy the code
Solutions:
Creating a Scala project 1. Remove scala version library 1 from the project. Add the spark library spark-assembly-1.1.0-cdh5.2.0-hadoop2.5.0-cdh5.2.0.jar 3. Modify the Scala compiled version in the project. 4. Right-click Scala and select Set the Scala Installation
Can also be
Right-click Project > Properties > Scala Compiler > Use Project Setting and select the Scala version of Spark. In this case, select Lastest2.10 bundle
Third, the Java
Java programs basically follow the scala process, modified,
package com.chb.java; / / Java package import Java. Util. Arrays; / / spark package import org. Apache. Spark. SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; / / packages import scala. Tuple2; public class WordCount { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("wordcount").setMaster("local"); JavaSparkContext JSC = new JavaSparkContext(conf); // JavaSparkContext JSC = new JavaSparkContext(conf); JavaRDD < String > lines = JSC. TextFile (" HDFS: / / 192.168.1.224:9000 / user/root/ABC. Log "); JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @override public Iterable<String> Call (String line) throws Exception {// Convert data to a List return through the arrays.asList () method Arrays.asList(line.split(" ")); }}); JavaPairRDD<String, Integer> pairs = words. MapToPair (new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); }}); //reduceByKey JavaPairRDD<String, Integer> wcs = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer arg0, Integer arg1) throws Exception { return arg0 + arg1; }}); // Replace key value with key value, JavaPairRDD<Integer, String> tmpWCs = wcs.mapToPair(new PairFunction<Tuple2<String,Integer>, Integer, String>() { private static final long serialVersionUID = 1L; @Override public Tuple2<Integer, String> call(Tuple2<String, Integer> tuple) throws Exception { return new Tuple2<Integer, String>(tuple._2, tuple._1); }}); JavaPairRDD<Integer, String> sortedWCs = tmpwcs.sortByKey (); // Replace key value with key value, JavaPairRDD<String, Integer> resultWCs = sortedWcs. mapToPair(new PairFunction<Tuple2<Integer,String>, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(Tuple2<Integer, String> tuple) throws Exception { return new Tuple2<String, Integer>(tuple._2, tuple._1); }}); resultWCs.foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> arg0) throws Exception { System.out.println(arg0._1 + "---->" + arg0._2); }}); jsc.close(); }}Copy the code