What is the graphs
MapReduce is a computing model adopted by Hadoop for multi-node computing, and to put it more simply, it is a set of methodology for Hadoop to split tasks. When I first came into contact with the concept of MapReduce, it was difficult to understand it at the moment, and I also searched a lot of information, because everyone understood it differently, and the more I looked at it, the more confused I became. In fact, it is essentially a very simple thing. Here is an example to help you understand, since most of the web is an example of Hadoop’s official word calculation (wordcount), here is an example of another scenario.
Imagine the following report card
1, zhang SAN, 78,87,69 2, li si, 56,76,91 3, fifty, 65,46,84 four, six, 89,56,98...
The columns are numbered, student names, Chinese scores, math scores, English scores, now required statistics of the highest scores in each subject, assuming that the transcript is very long, there are tens of millions of lines, without the help of any computer system, how to rely on manual to solve this problem?
- Single statistics
The advantage of assigning a single person to carry out statistics is that it is simple, but the disadvantage is also obvious. It takes a very long time, and even when the amount of data reaches a certain level, it may take a single person a lifetime to complete the statistics
- Many statistical
If there are enough people to do the work, how do you coordinate them? Assume that the report card has 100W lines and 1000 people can be counted
-
- Set up an administrator, the administrator divides the report card into 1000 equally to 1000 people, each person needs to count 1000 lines of data
-
- The administrator made a form and asked everyone to fill in the results of his statistics. The form format is as follows
subjects | Person 1 Results | Person 2 Results | . | 1000 personnel results |
---|---|---|---|---|
Chinese language and literature | ||||
mathematics | ||||
English |
-
- The administrator finally gets the following data
Subject 2 results | | | 1 results personnel… | 1000 results
Chinese language and literature | 80 | 85 | . | 76 | |
mathematics | 89 | 90 | . | 88 | |
English | 94 | 85 | . | 90 |
-
- Each section has 1000 results, and the administrator split the table into 100 small tables to 100 people for statistics, so that each small table has 10 data, the format of the small table is as follows
The first person got a little form
subjects | Person 1 Results | Person 2 Results | . | Staff 10 Results | |
---|---|---|---|---|---|
Chinese language and literature | 80 | 85 | . | 76 | |
mathematics | 89 | 90 | . | 88 | |
English | 94 | 85 | . | 90 |
The second little form I got
subjects | Staff 11 Results | Staff 12 Results | . | Person 20 Results | |
---|---|---|---|---|---|
Chinese language and literature | 83 | 75 | . | 88 | |
mathematics | 79 | 95 | . | 58 | |
English | 94 | 85 | . | 90 |
-
- The administrator collected the results of each person again, and obtained another 100 pieces of data. If the administrator wanted to, he could divide the data and hand it over to multiple people for statistics, so that he could finally get a maximum result. The administrator could also complete the final statistics by himself, because the amount of data was not large.
So in this process, we see, a huge report card after the following steps, finally we get the results we want
- The report card is split into multiple copies
- Each is counted separately
- Register the results
- The results of the statistics can be split again, or can be directly counted
- So after the final results are obtained
This process can be described in MapReduce language as follows:
- Split multiple transcripts – Split
- Each copy is counted separately – MAP
- And register the results – shuffle
- The statistical results can be split again – combiner
- You can also do statistics-reduce directly
In addition, in the administrator’s form, the three subjects are recorded behind
The development of
Let’s solve the above problem with real Java code, assuming you have installed the Hadoop cluster environment as per the previous tutorial
- Create a project
You can create a normal Java project using your familiar IDE. It is recommended that you use Maven for package management and add the following package dependencies
< the dependency > < groupId > org, apache hadoop < / groupId > < artifactId > hadoop - client < / artifactId > < version > 2.5.1 < / version > </dependency>
- Create Mapper
Mapper corresponds to the map process in MapReduce, in the following Mapper code:
StudentMapper.java
public class StudentMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> { public void map(LongWritable longWritable, Text text, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException { String[] ss = text.toString().split(","); OutputCollector. Collect (new Text(" Text "), new Intwritable (Integer.ParseInt (SS [2])))); OutputCollector. Collect (new Text(" Math "), new Intwritable (Integer.ParseInt (SS [3])))); OutputCollector. Collect (new Text(" English "), new Intwritable (Integer.ParseInt (SS [4])))); }}
StudentMapper implements the Mapper
interface. There are four parameters. The four parameters mean as follows
- LongWritable: Hadoop will split the TXT file by line. This represents the position of the line in the file, not normally used
- Text: line content, such as the first line
1, zhang SAN, 78,87,69
- Text: As mentioned above, we need to sum up the subjects and calculate the highest score, so the name of the subjects is the key, and the result of each calculation is the value, so we use the Text to indicate the key, because we need to store the name of the subjects
- Intwritable: Stores the calculation result, here refers to the highest score of the subject based on the statistics
Method map(LongWritable LongWritable, Text Text, OutputCollector
OutputCollector, Reporter Reporter). Note that the OutputCollector is an array, which means that multiple results can be written here, and that the Reporter can report the progress of the task to Hadoop. In this Mapper, we don’t do any calculations, we just parse the grades in the text and put them into the OutputCollector by subject. It’s like everyone didn’t do any work the first time, just put the data together. After passing through the mapper, the data from
1, zhang SAN, 78,87,69 2, li si, 56,76,91 3, fifty, 65,46,84 four, six, 89,56,98...
Turned out to be
– | – | – | ||||
---|---|---|---|---|---|---|
Chinese language and literature | 78 | 56 | 65 | 89 | . | |
mathematics | 87 | 76 | 46 | 56 | . | |
English | 69 | 91 | 84 | 98 | . |
StudentReducer.java
public class StudentReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> { public void reduce(Text text, Iterator<IntWritable> iterator, OutputCollector<Text, IntWritable> outputCollector, Reporter reporter) throws IOException { StringBuffer str = new StringBuffer(); Integer max = -1; while (iterator.hasNext()) { Integer score = iterator.next().get(); if (score > max) { max = score; } } outputCollector.collect(new Text(text.toString()), new IntWritable(max)); }}
Now the Reducer starts to actually perform the calculations, Reducer function reduce(Text, Iterator
Iterator, outputCollector
outputCollector, Reporter Reporter) parameters are as follows:
- Text: is the third parameter of the Mapper
- Iterator: The data written to the OutputCollector in the Mapper, combined with the first parameter, is the OutputCollector in the Mapper
- OutputCollector: Reducer The calculated result needs to be written to this parameter, and here we write something similar
Key: the language value: 90
Structure, so the type is<Text, IntWritable>
As mentioned earlier, Mapper will organize the data and write the scores into the OutputCollector according to subjects. When it comes to the Reducer step, Hadoop will summarize the data written by Mapper according to keys (i.e. subjects) and deliver it to the Reducer. The Reducer is responsible for calculating the highest score inside and also writing the results into the OutputCollector.
StudentProcessor
public class StudentProcessor { public static void main(String args[]) throws Exception { JobConf conf = new JobConf(StudentProcessor.class); conf.setJobName("max_scroe_poc1"); conf.setOutputKeyClass(Text.class); conf.setOutputValueClass(IntWritable.class); conf.setMapperClass(StudentMapper.class); conf.setReducerClass(StudentReducer.class); conf.setInputFormat(TextInputFormat.class); conf.setOutputFormat(TextOutputFormat.class); FileInputFormat.setInputPaths(conf, new Path(args[0])); FileOutputFormat.setOutputPath(conf, new Path(args[1])); JobClient.runJob(conf); }}
We also need a startup class that contains the main function, execute the MVN package command to package, let’s say the package name is hadoop-score-job.jar, and upload the jar to the server directory via some tool like FTP.
- Upload data
Hadoop uses the HDFS distributed file system to store large files on multiple nodes. With the HDFS CLI tool, we feel like we are working with local files. In the code above FileInputFormat. SetInputPaths (conf, new Path (args [0])); The user specifies the directory in which the files are used as the data source. The directory here is the directory in HDFS, and the directory must exist, and the data should be uploaded to the directory. Execute the following command to create the directory
hadoop fs -mkdir poc01_input
Execute the following command to import the data into HDFS
hadoop fs -put score.txt poc01_input
Score. TXT for content
1, Zhang San,78,87,69 2, Li Si,56,76,91 3, Wang Wu,65,46,84 4, Zhao Liu,89,56,98
Use the ls command to check whether the file was uploaded successfully
$ hadoop fs -ls poc01_input
Found 1 items
-rw-r--r-- 1 hadoop supergroup 72 2020-12-13 15:43 poc01_input/score.txt
- To perform the job
Execute the following command to start running the Job
$ hadoop jar hadoop-score-job.jar com.hadoop.poc.StudentProcessor poc01_input poc01_output 20/12/13 16:01:33 INFO Client.RMProxy: Connecting connection to resourceManager at master/172.16.8.42:18040 20/12/13 16:01:33 INFO Client.RMProxy: Connecting connection to resourceManager at master/172.16.8.42:18040 20/12/13 16:01:33 INFO Connecting to the ResourceManager at master / 172.16.8.42:18040 20/12/13 16:01:34 WARN graphs. JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 20/12/13 16:01:34 INFO mapred.FileInputFormat: Total input files to process : 1 20/12/13 16:01:35 INFO mapreduce.JobSubmitter: number of splits:2 20/12/13 16:01:35 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1607087481584_0005 20/12/13 16:01:35 INFO conf.Configuration: resource-types.xml not found 20/12/13 16:01:35 INFO resource.ResourceUtils: Unable to find 'resource-types.xml'. 20/12/13 16:01:35 INFO resource.ResourceUtils: Adding resource type - name = memory-mb, units = Mi, type = COUNTABLE 20/12/13 16:01:35 INFO resource.ResourceUtils: Adding resource type - name = vcores, units = , type = COUNTABLE 20/12/13 16:01:36 INFO impl.YarnClientImpl: Submitted application application_1607087481584_0005 20/12/13 16:01:36 INFO mapreduce.Job: The url to track the job: http://master:18088/proxy/application_1607087481584_0005/ 20/12/13 16:01:36 INFO mapreduce.Job: Running job: job_1607087481584_0005 20/12/13 16:01:43 INFO mapreduce.Job: Job job_1607087481584_0005 running in uber mode : false 20/12/13 16:01:43 INFO mapreduce.Job: map 0% reduce 0% 20/12/13 16:01:51 INFO mapreduce.Job: map 100% reduce 0% 20/12/13 16:01:57 INFO mapreduce.Job: map 100% reduce 100% 20/12/13 16:01:57 INFO mapreduce.Job: Job job_1607087481584_0005 completed successfully 20/12/13 16:01:57 INFO mapreduce.Job: Counters: 49 File System Counters FILE: Number of bytes read=84 FILE: Number of bytes written=625805 FILE: Number of read operations=0 FILE: Number of large read operations=0 FILE: Number of write operations=0 HDFS: Number of bytes read=316 HDFS: Number of bytes written=30 HDFS: Number of read operations=9 HDFS: Number of large read operations=0 HDFS: Number of write operations=2 Job Counters Launched map tasks=2 Launched reduce tasks=1 Data-local map tasks=2 Total time spent by all maps in occupied slots (ms)=12036 Total time spent by all reduces in occupied slots (ms)=3311 Total time spent by all map tasks (ms)=12036 Total time spent by all reduce tasks (ms)=3311 Total vcore-milliseconds taken by all map tasks=12036 Total vcore-milliseconds taken by all reduce tasks=3311 Total megabyte-milliseconds taken by all map tasks=12324864 Total megabyte-milliseconds taken by all reduce tasks=3390464 Map-Reduce Framework Map input records=4 Map output records=12 Map output bytes=132 Map output materialized bytes=90 Input split bytes=208 Combine input records=12 Combine output records=6 Reduce input groups=3 Reduce shuffle bytes=90 Reduce input records=6 Reduce output records=3 Spilled Records=12 Shuffled Maps =2 Failed Shuffles=0 Merged Map outputs=2 GC time elapsed (ms)=395 CPU time spent (ms)=1790 Physical memory (bytes) snapshot=794595328 Virtual memory (bytes) snapshot=5784080384 Total committed heap usage (bytes)=533200896 Shuffle Errors BAD_ID=0 CONNECTION=0 IO_ERROR=0 WRONG_LENGTH=0 WRONG_MAP=0 WRONG_REDUCE=0 File Input Format Counters Bytes Read=108 File Output Format Counters Bytes Written=30
- Hadoop-score-job.jar is the JAR package packaged above. You need to CD to the JAR directory to execute the command
- Com. Hadoop. Poc. StudentProcessor contains the main function of class
- POC01_INPUT data source directory
- POC01_OUTPUT data output directory
When the job is finished, the results are stored in the directory poc01_output
$ hadoop fs -ls poc01_output2 Found 2 items -rw-r--r-- 1 hadoop supergroup 0 2020-12-13 16:01 poc01_output2/_SUCCESS -rw-r--r-- 1 Hadoop SuperGroup 30 2020-12-13 16:01 poc01_output2/part-00000 $Hadoop FS-Cat poc01_output2/part-00000 Math 87 English 98 Chinese 89