1. MapReduce principle
1.1. The core of MapReduce framework is:
- Map
- Reduce
When submitting a task to the MapReduce framework, the system divides the task into several Map tasks and assigns them to different nodes for execution. Each Map task processes part of the input data. After the Map task is completed, some intermediate files are generated and serve as the input data of the Reduce task. The main objective of a Reduce task is to summarize the output of several Maps. The MapReduce data flow diagram is as follows:
- Map – shuffle – sort – Grouping – reduce – > Output to the local disk
1.2 MapReduce Process Analysis
1.2.1 Mapper process
1.2.1.1 InputSplit: split the file. The input of Map is split. Each InputSplit will be processed by a Map task
-
By default, the size of an input slice (InputSplit) is the same as the size of a data Block (Block)
-
By default, one HDFS block size (128 MB by default) is used as a fragment. The block size can be set.
-
If the size of a Block is 128MB by default, there are two input files, one is 32MB and the other is 172MB. A file that small is one input slice, a large file is divided into two data blocks, so two input slices. A total of three input slices are generated. Each input slice is processed by a Mapper process. So these three pieces of input, they’re going to be processed by three Mapper processes. \
- This area also shows that Hadoop can have performance issues when handling large numbers of small files, resulting in too many Map tasks
- In addition, if there are too many small files, each file occupies one block, and namenode stores the metadata, this will consume namenode resources.
- Accessing a large number of small files is much faster than accessing several large files. HDFS was originally developed for streaming access to large files. If a large number of small files are accessed, it is necessary to constantly jump from datanode to datanode, which seriously affects performance. Finally, processing large numbers of small files is much faster than processing large files of the same size.
1.2.1.2. The default rule is to parse each line of text into key-value pairs. Key is the starting position (in bytes) of each line, and value is the text content of the line.
1.2.1.3. Call the map method of Mapper class
- The map method is called once for each key-value pair that was resolved in the previous phase. If there are 1000 key-value pairs, the map method is called 1000 times. Each call to the map method outputs zero or more key-value pairs.
- –
1.2.1.4 Processing of map output results:
-
1, temporary buffer \
- It is temporarily placed in a circular memory cache (default: 100M, controlled by io.sort.mb)
-
2, partition \
- Partitioning is based on key values, which determines which Reducer this data will be processed by.
-
3, sort
-
4, spill to disk\
-
Spill can be thought of as a process that includes Sort and a Combiner (the Combiner is optional, if defined by the user).
-
When the cache is about to overflow (80% of the buffer size by default, controlled by the io.sort.spill. Percent property), an overflow file is created on the local system and data from the buffer is written to this file. \
-
There may be many overflow files, it is necessary to merge these files, merge the process, will constantly sort and combiner operation, purpose has two: \
- Minimize the amount of data written to the disk at a time
- Minimize the amount of data transferred over the network in the next replication phase. The result is a partitioned and sorted file. To reduce the amount of data transferred over the network, you can compress the data here by setting mapred.press.map. out to true
-
-
-
5、 merge on disk
- 4. Copy data in the partition to the corresponding Reduce job
-
One might ask: how does the data in the partition know which reduce it corresponds to? The Map task keeps in touch with its parent, TaskTracker, which keeps in touch with JobTracker. So JobTracker holds macro information for the entire cluster. The Reduce job obtains the map output location from JobTracker.
1.2.2. Reduce processes
-
1. The Reducer task will actively copy its output key-value pairs from the Mapper task, and the data from each map is in order. If the reduce end receives a small amount of data, the data is stored in the memory. If the amount of data exceeds a certain proportion of the buffer size, the data is merged and overwritten to disk. \
- Buffer size by mapred. Job. Shuffle). The input buffer. The percent attribute controls, the percentages of heap space used for this purpose;
- Scale by mapred. Job. Shuffle. Merge. Percent
-
2. As the number of overwritten files increases, background threads merge them into a larger ordered file (sort, merge) \
- This is done to save time for later merges. In fact, MapReduce repeatedly performs sorting and merging operations on both the Map and Reduce ends.
-
3. Many intermediate files are written to disk during the merge process, but MapReduce keeps as little data written to disk as possible, and the result of the last merge is not written to disk, but directly entered into the Reduce function.
-
4, reduce
- Call the reduce method on the set (values) of the same key.
-
5, output \
- Write these output key-value pairs to the HDFS file.
1.2.3 Shuffle Process
Shuffle includes the map method output (partition->sort>spill->merge) to reduce input (merge->sort).
3. Analysis of MapReduce Working mechanism (Hadoop1.x)
– 1. Submit the MapReduce program to any node in the cluster.
– 2. After receiving the Job, JobClient requests JobTracker to obtain a Job ID.
– 3. Copy the resource files required for running the Job to HDFS (including JAR files packed by MapReduce, configuration files, and input partition information calculated by the client). These files are stored in a folder created by JobTracker for the Job.
– 4. After obtaining the job ID, submit the job;
– 5. After JobTracker receives the job, it puts it in a job queue and waits for the job scheduler to schedule it. When the job scheduler schedules the job according to its own scheduling algorithm, it creates a map task for each partition according to the input partition information and assigns the map task to TaskTracker for execution.
– 6. For Map and Reduce tasks, TaskTracker has a fixed number of Map slots and Reduce slots based on the number of host cores and memory size. It is important to note that map tasks are not randomly assigned to a TaskTracker. There is a concept called data-local. Assign a map task to a TaskTracker containing the block of data that the map processes, and copy the JAR package to the TaskTracker to run.
– 7. TaskTracker sends JobTracker a heartbeat every once in a while, telling JobTracker that it is still running and carrying a lot of information, such as the progress of the current map task. When JobTracker receives the last task completion message for the job, it sets the job to Success. When JobClient queries the status, it knows that the task is complete and displays a message to the user.
– 8. The running TaskTracker obtains resources required for running from HDFS, including JAR files packaged by MapReduce, configuration files, and input partition calculated by the client.
– 9. After TaskTracker obtains the resource, it starts a new JVM.
-10. Run each task.