Spark classic word statistics
To prepare data
Since we need a text containing a certain number of words to make statistics, we choose the text of GoneWithTheWind, the English original work, to make statistics and see how frequently each word appears in the article. To make it easier for you to download the text. You can download the text and the corresponding code on GitHub. I put the text in the project’s directory.
So first we’re going to read that file, using the textFile method in SparkContext, and we’re going to try to read the first line first.
Scala implementation
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(conf)
println(sc.textFile("./GoneWithTheWind").first())
}
}
Copy the code
Java implementation
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
public class WordCountJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
JavaSparkContext sc = new JavaSparkContext(conf);
System.out.println(sc.textFile("./GoneWithTheWind").first()); }}Copy the code
Python implementation
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("HelloWorld")
sc = SparkContext(conf=conf)
print(sc.textFile("./GoneWithTheWind").first())
Copy the code
Get the output
Chapter 1
Copy the code
In Scala, for example, the other two languages are similar. The first step is to create a SparkConf
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
Copy the code
So we’re going to set Master to local, and we’re going to call it WordCount, but you can call it anything you want, if it’s different than the class name. The Master, however, cannot be written out, so be careful when running on a cluster using spark-Submit. We’re only talking about local notation right now, so I’m just going to say local.
Next we create a SparkContext, which is the heart of Spark, and pass in the CONF configuration to initialize it
val sc = new SparkContext(conf)
Copy the code
Finally, we tell SparkContext the text path, and then print the first line
println(sc.textFile("./GoneWithTheWind").first())
Copy the code
Start statistics
Then we can start counting the number of words in the text. Since words are divided by Spaces, we can use Spaces as word markers.
Scala implementation
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(conf) // Set data path val text = sc.textFile("./GoneWithTheWind"Val textSplit = text.flatMap(line =>line.split())""// Return a tuple (key,value) where key is the word,value is 1, Val textSplitFlag = textsplit.map (Word => (word,1)) //reduceByKey Will put the keys in textSplitFlag together. X is the value after the last count, and y is the value in this word. That is, every time is x + 1 val countWord = textSplitFlag. ReduceByKey ((x, y) = > x + y) / / computed result is the result of project directory directory countWord. SaveAsTextFile ("./result")}}Copy the code
Java implementation
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class WordCountJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> textRDD = sc.textFile("./GoneWithTheWind"); <String, String> <String, String> <String > <String > JavaRDD<String> splitRDD = textRDD. FlatMap (new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception {return Arrays.asList(s.split("")).iterator(); }}); // Process the elements in the combined collection, each of which has a value of 1, and return a Tuple2, which represents a tuple of two elements. //PairFunction <String, String, Integer>, the first String is the input value type // the second and third strings, String, Integer is the return value type // in this case, it returns a word and a number 1, JavaPairRDD<String, Integer> splitFlagRDD = splitRDD. MapToPair (new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception {returnnew Tuple2<>(s,1); }}); //reduceByKey will put the same keys in splitFlagRDD together // In the (x,y) passed in, x is the value after the last count,y is the value in this word, JavaPairRDD<String, Integer> countRDD = splitFlagrdD. reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integerinteger, Integer integer2) throws Exception {
return integer+integer2; }}); // Save the result in the result directory of the project directory. SaveAsTextFile ("./resultJava"); }}Copy the code
Python implementation
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("HelloWorld")
sc = SparkContext(conf=conf)
Set the path of the data
textData = sc.textFile("./GoneWithTheWind")
The flatMap will combine the elements of each array into a large collection
splitData = textData.flatMap(lambda line:line.split(""))
# Process the elements in the merged collection, each element of value 1, and return a tuple (key,value)
# key = 1; value = 1
flagData = splitData.map(lambda word:(word,1))
# reduceByKey will put the same keys in textSplitFlag together
# in (x,y), x is the value of the last count,y is the value of this word, that is, x+1 each time
countData = flagData.reduceByKey(lambda x,y:x+y)
Output file
countData.saveAsTextFile("./result")
Copy the code
After running, a directory named result is obtained under the living directory, as shown in the following figure. SUCCESS indicates that the file is successfully generated and the file content is stored in part-00000
We can view part of the file:
('Chapter'(1),'1'(1),'SCARLETT'(1),'O' HARA '(1),'was', 74),'not', 33)
('beautiful,'(1),'but'(32),'men'(4),'seldom'And (3)'realized'And (2)'it', (37)'when', (19)'caught'(1),'by', (20)'her', 65),'charmas'(1),'the', 336),'Tarleton'(7),'twins'(16),'were.'(1),'In'(1),'face'(6),'were'49),... . . .Copy the code
This completes spark’s real HelloWorld program — word count. Comparing the three versions of the program, one fact is that the code written in Scala and Python is very concise and easy to understand, while the Java implementation is relatively complex and difficult to understand. Of course, this is understandable and difficult to understand is relative. If you only know Java, you should be able to understand Java programs anyway, while concise Scala and Python are completely unreadable to you. That’s ok. Language is just a tool. It depends on how you use it. Moreover, we can write clean code using java8’s features.
Java8 implementation
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class WordCountJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
JavaSparkContext sc = new JavaSparkContext(conf);
countJava8(sc);
}
public static void countJava8(JavaSparkContext sc){
sc.textFile("./GoneWithTheWind")
.flatMap(s->Arrays.asList(s.split("")).iterator())
.mapToPair(s->new Tuple2<>(s,1))
.reduceByKey((x,y)->x+y)
.saveAsTextFile("./resultJava8"); }}Copy the code
The advantage of Spark is already demonstrated in this small program, which counts the number of times each word appears in a book. Spark runs on a single computer (reading files, generating temporary files, and writing the results to hard disk), loading, running, and finishing in just two seconds.
Optimize the program
Could the program be simpler and more efficient? Of course we can, we can use countByValue, which is the usual way to count things.
Scala implementation
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WordCount") val sc = new SparkContext(conf) // Set data path val text = sc.textFile("./GoneWithTheWind"Val textSplit = text.flatMap(line =>line.split())""))
println(textSplit.countByValue())
}
}
Copy the code
Run results
Map(Heknew -> 1,     "Ashley -> 1," Let's -> 1, anjingle -> 1, of. -> 1, pasture -> 1, war's -> 1, Wall. -> 1, looks -> 2, Ain 't - > 7,...Copy the code
Java implementation
public class WordCountJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava"); JavaSparkContext sc = new JavaSparkContext(conf); countJava(sc); } public static void countJava(JavaSparkContext sc){// Set the path of data JavaRDD<String> textRDD = sc.textFile("./GoneWithTheWind"); <String, String> <String, String> <String > <String > JavaRDD<String> splitRDD = textRDD. FlatMap (new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String s) throws Exception {return Arrays.asList(s.split("")).iterator(); }}); System.out.println(splitRDD.countByValue()); }}Copy the code
Run results
{Heknew=1,     "Ashley=1," Let's =1, anballoon =1, of.=1, pasture=1, war's =1, wall.=1, Looks =2, ain't =7, Clayton=1, approval.=1, ideas=1,Copy the code
Python implementation
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("HelloWorld")
sc = SparkContext(conf=conf)
Set the path of the data
textData = sc.textFile("./GoneWithTheWind")
The flatMap will combine the elements of each array into a large collection
splitData = textData.flatMap(lambda line:line.split(""))
print(splitData.countByValue())
Copy the code
Run the results:
defaultdict(<class 'int'{>,'Chapter': 1, '1': 1, 'SCARLETT': 1, 'O' HARA ': 1, 'was': 74, 'not': 33, 'beautiful,': 1, 'but': 32.'men': 4.Copy the code
github