preface

The previous two articles documented the HDFS distributed file system. This article formally begins by describing the computing framework MapReduce. The main content of this article is to understand the core idea of MapReduce and the 8 steps of programming.

What is MapReduce?

1. Definition of MapReduce

  • MapReduce is a programming framework for distributed computing programs and a core framework for users to develop Hadoop-based data analysis applications.
  • The core function of MapReduce is to integrate user-written service logic codes and default components into a complete distributed computing program, which concurrently runs on a Hadoop cluster.

2. The core idea of MapReduce

  • The Idea of MapReduce is everywhere in life. More or less everyone has been exposed to this idea. The core concept of MapReduce is “divide and conquer”, which is suitable for large-scale data processing scenarios with a large number of complex tasks. Even Google, which published a paper implementing distributed computing, has implemented the idea, not invented it itself.
  • Map is responsible for “partitioning,” or breaking complex tasks into several “simple tasks” for parallel processing. Splitting is possible only if these small tasks can be computed in parallel and have little dependence on each other.
  • Reduce combines results in the Map phase globally.
  • Together, these two phases are the embodiment of the MapReduce idea.

Here’s another visual way to explain MapReduce:

  • We need to count all the books in the library. You count shelf one, I’ll count shelf two. This is Map. The more of us, the faster we count the books.
  • Now let’s get together and add up everyone’s statistics. This is Reduce.

2. MapReduce programming

1.MapReduce programming model

  • MapReduce is a distributed computing framework designed with a divide-and-conquer approach
  • So what is divide and conquer?
    • For example, a complex, large amount of calculation, time-consuming task, temporarily called “large task”;
    • If the result cannot be calculated using a single server or in a short time, the large task can be divided into small tasks, which are executed in parallel on different servers.
    • Finally, summarize the results of each small task
  • MapReduce consists of two phases:
    • Map phase (cut into small tasks)
    • Reduce phase (summarize the results of small tasks)

1. The Map phase

  • The map phase has a key map() function;
  • The input to this function is a key-value pair
  • The output is a series of key-value pairs, and the output is written to local disk.

2. Reduce phase

  • The reduce phase has a key function, the reduce() function
  • The input to this function is also a key-value pair (i.e. the output of map (kv pair))
  • The output is also a series of key-value pairs, and the result is eventually written to HDFS
  1. Map&Reduce

2.Mapreduce Programming Guidelines (eight steps)

  • Summary of mapReduce programming model:
  • MapReduce development consists of eight steps: The Map phase consists of two steps, the Shuffle phase consists of four steps, and the Reduce phase consists of two steps

1. The Map phase consists of two steps

  • Step 1: Set the inputFormat class to split our data into key and value pairs and input them to step 2
  • Step 2: Customize the map logic to process the input data of our first step and then convert it into a new key and value pair for output

2. The shuffle phase consists of four steps

  • Step 3: Partition the output key and value pairs. (Data with the same key belongs to the same partition)
  • Step 4: Sort the data in different partitions by the same key
  • Step 5: Combine grouped data to reduce network copy of data (optional step)
  • Step 6: Group sorted data. In the grouping process, put the values of the same key into a set (call reduce method for each group of data)

3. The Reduce phase consists of two steps

  • Step 7: Merge and sort multiple Map tasks, write the reduce function’s own logic, process the input key and value pairs, and convert them into new key and value pairs for output
  • Step 8: Set outputFormat to save the output key and value to a file.

3. Common data types in Hadoop

  • Instead of following the basic data types in Java, Hadoop encapsulates a set of data types by itself. The types encapsulated by hadoop and Java are as follows
  • Hadoop data serialization types corresponding to common data types in the following table
Java type Hadoop Writable type
Boolean BooleanWritable
Byte ByteWritable
Int IntWritable
Float FloatWritable
Long LongWritable
Double DoubleWritable
String Text
Map MapWritable
Array ArrayWritable
byte[] BytesWritable

4. Word statistics for introduction to MapReduce programming

5. Mapreduce programming introduction case of word count statistics implementation

  • Requirement: The existing data format is as follows. Each line of data is separated by commas, and the number of occurrences of each word is calculated
hello,hello
world,world
hadoop,hadoop
hello,world
hello,flume
hadoop,hive
hive,kafka
flume,storm
hive,oozie
Copy the code

Step 1: Create the Maven project and import the following JAR packages

<repositories>
        <repository>
            <id>cloudera</id>
            <url>https://repository.cloudera.com/artifactory/cloudera-repos/</url>
        </repository>
    </repositories>
    <dependencies>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>Server - mr1 - cdh5.14.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>Server - cdh5.14.2</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-hdfs</artifactId>
            <version>Server - cdh5.14.2</version>
        </dependency>
 
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-mapreduce-client-core</artifactId>
            <version>Server - cdh5.14.2</version>
        </dependency>
        <! -- https://mvnrepository.com/artifact/junit/junit -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.testng</groupId>
            <artifactId>testng</artifactId>
            <version>RELEASE</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.0</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <! -- <verbal>true</verbal>-->
                </configuration>
            </plugin>
 
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.4.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <minimizeJar>true</minimizeJar>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
Copy the code

Step 2: Define the Mapper class

import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Mapper;
 import java.io.IOException;
 
 /** * Custom mapper class needs to inherit mapper, there are four generics, * keyin: k1 line offset Long * Valuein: v1 line text content String * keyout: K2 Each word String * valueout: V2 1 int * Hadoop encapsulates a set of basic types * long ==>LongWritable * String ==> Text * int ==> IntWritable * */
 public class MyMapper extends Mapper<LongWritable.Text.Text.IntWritable> {
    /** * the map method is called every time a row of data is read@paramKey: corresponds to k1 *@paramValue: corresponds to v1 *@paramContext Context object. Use the context to send the data to the next step@throws IOException
      * @throwsInterruptedException * k1 v1 * 0; hello,world * * k2 v2 * hello 1 * world 1 */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // Get our line of data
        String line = value.toString();
        String[] split = line.split(",");
        Text text = new Text();
        IntWritable intWritable = new IntWritable(1);
        for (String word : split) {
            // Count each word as one occurrence
            / / key2 Text types
            / / v2 IntWritable type
            text.set(word);
            // write our key2 v2 downstreamcontext.write(text,intWritable); }}}Copy the code

Step 3: Define the Reducer class

import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Reducer;
 
 import java.io.IOException;
 
 public class MyReducer extends Reducer<Text.IntWritable.Text.IntWritable> {
    // Step 3: Partition data with the same key to send to the same Reduce, the same key merge, value to form a set
    /** * Override the reduce method * after inheriting the Reducer class@param key
      * @param values
      * @param context
      * @throws IOException
      * @throws InterruptedException
      */
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int result = 0;
        for (IntWritable value : values) {
            // Add up our results
            result += value.get();
        }
        // Continue to output our data
        IntWritable intWritable = new IntWritable(result);
        // Output our datacontext.write(key,intWritable); }}Copy the code

Step 4: Assemble the main program

import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.conf.Configured;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
 import org.apache.hadoop.util.Tool;
 import org.apache.hadoop.util.ToolRunner;
 
 /* This class acts as the entry class for the Mr Program, where the main method */ is written
 public class WordCount extends Configured implements Tool{
    /** * After implementing the Tool interface, we need to implement a run method, which is used to assemble our program logic in eight steps@param args
      * @return
      * @throws Exception
      */
    @Override
    public int run(String[] args) throws Exception {
        // Get the Job object and assemble our eight steps, each a class
        Configuration conf = super.getConf();
        Job job = Job.getInstance(conf, "mrdemo1");
 
        // In practice, the program is usually packaged to the cluster to run, into a JAR package
        // If you want to package to run on a cluster, you must add the following Settings
        job.setJarByClass(WordCount.class);
 
        K1: line offset v1: line of text
        job.setInputFormatClass(TextInputFormat.class);
        // specify which path to go to read the file
        TextInputFormat.addInputPath(job,new Path("File Location"));
        // Step 2: customize map logic to accept k1 v1 as the new k2 v2 output
        job.setMapperClass(MyMapper.class);
        // Set the map key and value types, which are k2 v2 types
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(IntWritable.class);
        // Steps 3 through 6: partitioning, sorting, specification, grouping are omitted
        // Step 7: Customize reduce logic
        job.setReducerClass(MyReducer.class);
        // Set the type of key3 value3
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        // Step 8: Output K3 v3 for saving
        job.setOutputFormatClass(TextOutputFormat.class);
        // It is important to note that the output path should not exist. If it does, an error will be reported
        TextOutputFormat.setOutputPath(job,new Path("Output file location"));
        // Submit the job task
        boolean b = job.waitForCompletion(true);
        return b?0:1;
        /*** * Step 1: read the file, parse it into key,value pairs, k1 v1 * Step 2: customize map logic, accept K1 v1 as the new K2 v2 output * Step 3: partition. The data with the same key is sent to the same Reduce, the key is merged, and the value forms a set * Step 4: Sort Sort the key2. Lexicographical ordering * Step 5: Protocol Combiner process tuning steps Optional * Step 6: grouping * Step 7: custom Reduce logic accepts K2 V2 to transform into new K3 V3 output * Step 8: Output K3 V3 for saving * * */
    }
    /* As the entry class for the program */
    public static void main(String[] args) throws Exception {
        Configuration configuration = new Configuration();
        configuration.set("hello"."world");
        // After submitting the run method, we get an exit status code for the program
        int run = ToolRunner.run(configuration, new WordCount(), args);
        Exit the entire process according to the exit status code of our programSystem.exit(run); }}Copy the code

conclusion

This paper mainly introduces what MapReduce is, the core idea of MapReduce and the 8 steps of MapReduce programming, and finally realizes a small demand of word frequency statistics to help understand and memorize 8-step programming. The working mechanism of MapTask and ReduceTask and the complete process of MapReduce will be described in detail in the next article.

For more dry goods, please pay attention to my personal public account, pay attention to receive benefits