Starting from this article, I’ll start systematically output in the process of big data on pit accumulation, behind will involve real concrete operation of the project, the current plan is according to the series to update, strive to do a series of within 5 article summarizes the core of the dry goods, if involves the theory of the article, can explain, in a way that is drawing If it is an operational aspect, it will be demonstrated in actual code.

This is the first article in the MapReduce series. You will learn about the application scenarios of MapReduce, and there will be a code demonstration at the end of this article.

preface

Hadoop, an open source distributed computing framework implemented by Java language under Apache, consists of two parts: HDFS, a distributed file system, and MapReduce, a batch computing framework. This article, as the first in the MapReduce series, will explain the generation background of MapReduce, the calculation process of the framework, application scenarios, and Demo. The purpose of this article is to give you a preliminary understanding of the MapReduce batch computing framework and simple deployment and use.

directory

MapReduceThe background of the generation of

MapReduceThe calculation process of

MapReduceThe framework architecture of

MapReduceLife cycle of

Application scenarios

demoDemo

Background of MapReduce

Google launched MapReduce in 2004: Simplified Data Processing on Large Clusters In this paper, the functional characteristics and design concept of MapReduce are proposed. The starting point of MapReduce design is to solve how to decompose Large problems into independent small problems and then solve them in parallel. For example, one of the classic usage scenarios of MapReduce is to count the word frequency of a long article. The statistical process is to divide the article into sentences, then divide the article into sections, and finally count the number of words.

MapReduce architecture diagram

Let’s talk about the different components

Client

Client refers to that users use the MapReduce program to submit tasks to the Job Tracker through the Client. In addition, users can use the Client to view the running status of certain jobs.

Job Tracker

This is responsible for resource monitoring and job scheduling. JobTracker monitors the health of TaskTracker and jobs, and transfers failed tasks to other nodes. It also monitors the progress of task execution and resource usage, and notifies the task scheduler of these messages. The scheduler then selects appropriate tasks to use the resources when they are free.

Task scheduler is a pluggable module, users can design the corresponding scheduler according to their own needs.

TaskTracker

TaskTracker periodically reports its resource usage and task progress to Job Tracker through Hearbeat. Receives instructions from JobTaskcker to perform operations (such as starting a new task, killing a task, etc.).

In TaskTracker, slots are used to equalize the amount of resources on a node, and only when tasks get slots will they have a chance to run. The scheduler allocates free slots to tasks. You can configure the number of slots to limit the concurrency of tasks.

Task

Tasks are divided into Map tasks and Reduce tasks. In MapReduce,split is a Map Task. The split size can be set by the mapred.max-spilt. The default is the block size in Hadoop, 128 MB in Hadoop 2.x and 64 MB in Hadoop 1.x.

Generally speaking, a file will be set as a split. If it is a small file, there will be many Map tasks, which is a waste of resources. If the amount of split data blocks is large, it will cause cross-node data acquisition, which also consumes a lot of system resources.

Life cycle of MapReduce

There are five steps:

  1. Job submission and initialization

JobClient uses upload to load the jar package. JobClient creates a JobInProcess to manage the task through RPC. And create a TaskProcess to manage and control each Task.

  1. JobTracker schedules tasks

JobTracker schedules and manages tasks. When it finds available resources, it selects an appropriate task based on a policy to use the resources.

The Task scheduler has two points: one is to ensure the smooth operation of the job and transfer the calculation Task if there is a failed Task; the other is to start another Task to do the calculation if the calculation result of a Task falls behind that of the same Task, and finally calculate the one with the largest result.

  1. Task operating environment

TaskTracker prepares a separate JVM for each Task to avoid the impact of different tasks during execution, and also uses the operating system to implement resource isolation to prevent Task abuse.

  1. Perform a task

The progress of each Task is reported to TaskTracker via RPC, which in turn reports to JobTracker.

  1. The task is complete, and the output file is written to the HDFS.

MapReduce calculation process

Let’s take a look at a diagram to systematically understand the MapReduce operation process.

In order to make it easier for you to understand, a new diagram is drawn to show how to divide a long sentence and finally calculate the word frequency (punctuation has been ignored).

The sorting process involves reading files, splitting them into words one by one, sorting the statistics of all words into a Map task, sorting them according to dictionaries, and sorting word frequencies into a reduce task. The final step is to output to a file. For example, spacedong appears twice in the figure.

The five programmable components provided by Hadoop Mapreduce are InputFormat, Mapper, Partitioner, Reduce, and OutputFormat, which will be explained in detail in subsequent articles.

To sum up in one sentence, the operation process of Mapreduce is disassembling, sorting and summarizing. It solves statistical problems and uses the idea of divide and conquer.

Application scenarios of MapReduce

MapReduce was created to break down some big problems into smaller ones, and then solve the smaller problems and the bigger problems will be solved. So what kind of scenarios are used for this? That’s a lot. Just a few classic scenes.

To calculateURLVisit frequency of

During the use of search engines, a large number of URL visits will be encountered. Therefore, MapReduce can be used to conduct statistics and obtain (URL, number of times) results, which can be used in subsequent analysis.

Inverted index

The Map function parses a list of file formats (word, document number), the Reduce function parses this (word, document number), sorts all document numbers, and outputs (word, list (document number)), which forms a simple inverted index. It is a simple algorithm to track the position of the word in the document.

Top K problem

In various document analysis, or in different scenarios, we often encounter problems about Top K, such as the output of the Top 5 terms in this article. MapReduce can also be used for statistics at this time.

demoDemo

Today’s code demo is from both Python and Java versions. The Python version does not use packaged packages, while the Java version uses packaged packages of Hadoop. Next, I’ll demonstrate a simple use of MapReduce for word statistics.

JavaVersion of the code

  • First, prepare a data set, which contains the cut words. Here we set the file format to betxtFormat. File name isWordMRDemo.txt“, it reads the following short sentence separated by space:

hello my name is spacedong welcome to the spacedong thank you

  • The introduction ofHadoopDepends on the package
// The 2.6.5 dependency package is used. You can use other versions of <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-common</artifactId> < version > 2.6.5 < / version > < / dependency > < the dependency > < groupId > org.. Apache hadoop < / groupId > < artifactId > hadoop - client < / artifactId > < version > 2.6.5 < / version > < / dependency >Copy the code
  • newWordMapper.javaFile, the role of the code is to carry out in the form of space for word segmentation.
public class WordMapper extends Mapper<LongWritable, Text, Text, IntWritable> { @Override protected void map(LongWritable key, Text value, Mapper.Context context) throws java.io.IOException, InterruptedException { String line = value.toString(); St = new StringTokenizer(line);while(st.hasMoreTokens()) { String world = st.nextToken(); Write (new Text(world), new IntWritable(1)); // Map output context.write(new Text(world), new IntWritable(1)); }}}Copy the code
  • newWordReduce.javaFile, the role is to carry out vocabulary statistics.
public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> {

    @Override
    protected void reduce(Text key, Iterable<IntWritable> iterator, Context context)
            throws java.io.IOException ,InterruptedException {
        int sum = 0 ;
        for(IntWritable i:iterator){ sum+=i.get(); } context.write(key, new IntWritable(sum)); }}Copy the code
  • newWordMRDemo.javaThe file is used to runJobBegin to analyze the sentence.
public class WordMRDemo { public static void main(String[] args) { Configuration conf = new Configuration(); Hadoop /conf/mapred-site.xml (hadoop/conf/mapred-site.xml)"mapred.job.tracker"."hadoop:9000"); Try {// Create a new Job Job Job = new Job(conf); Job.setjarbyclass (wordmrDemo.class); Job.setmapperclass (wordmapper.class); // Set the reduce class job.setreducerClass (WordReduce. Class) to execute; Job.setmapoutputkeyclass (text.class); / / set the output value of the type of job. SetMapOutputValueClass (IntWritable. Class); // Job.setNumReducetasks (2); // Set the number of Ruduce tasks, the default number is one (general reduce number is more efficient) // job.setNumreducetasks (2); // MapReduce input data file/directory, note that can be input directory. FileInputFormat.addInputPath(job, new Path("F:\\BigDataWorkPlace\\data\\input")); // The data directory output after MapReduce execution cannot exist in advance. Otherwise, an error will be reported. FileOutputFormat.setOutputPath(job, new Path("F:\\BigDataWorkPlace\\data\\out")); // Exit system.exit (job.waitForcompletion (true)? 0:1); } catch (Exception e) { e.printStackTrace(); }}}Copy the code
  • The last executionWordMRDemo.javaFile, and then you get the resultoutThe contents of the folder, which looks like this:

Open the part-R-00000 file as follows

Python code version

  • newmap.pyFile, carry on vocabulary cutting.
for line in sys.stdin:
    time.sleep(1000)
    ss = line.strip().split(' ')
    for word in ss:
        print '\t'.join([word.strip(), '1'])
Copy the code
  • newred.pyFile for vocabulary statistics.
cur_word = None
sum = 0

for line in sys.stdin:
	ss = line.strip().split('\t')
	iflen(ss) ! = 2:continue
	word, cnt = ss

	if cur_word == None:
		cur_word = word

	ifcur_word ! = word:print '\t'.join([cur_word, str(sum)])
		cur_word = word
		sum = 0

	sum += int(cnt)

print '\t'.join([cur_word, str(sum)])
Copy the code
  • newrun.shFile, directly run.
HADOOP_CMD="/ usr/local/SRC/hadoop - 2.6.5 / bin/hadoop." "
STREAM_JAR_PATH="/ usr/local/SRC/hadoop - 2.6.5 / share/hadoop/tools/lib/hadoop - streaming - 2.6.5. Jar"

INPUT_FILE_PATH_1="/test.txt"
OUTPUT_PATH="/output"

$HADOOP_CMD fs -rmr -skipTrash $OUTPUT_PATH

# Step 1.
$HADOOP_CMD jar $STREAM_JAR_PATH \
    -input $INPUT_FILE_PATH_1 \
    -output $OUTPUT_PATH \
    -mapper "python map.py" \
    -reducer "python red.py" \
    -file ./map.py \
    -file ./red.py
Copy the code

The above is the core code of the demo, the complete code can be obtained on github code repository. Warehouse address is: https://github.com/spacedong/bigDataNotes

The above article is the first in the MapReduce series. The next article is a programming model for MapReduce. Stay tuned!

References:

Hadoop Technical Insider: In-depth analysis of MapReduce architecture design and implementation principles