Goal:

  • Understand the pros and cons of MapReduce
  • Key points: Master the core concepts of MapReduce
  • Understand the framework of MapReduce
  • Key: Master the Shuffle mechanism
  • Understand the functions of Combiner
  • Understand what joins do
  • Master data compression in Hadoop

1 overview graphs

1.1 graphs is defined

MapReduce is a programming framework for distributed computing programs. Its core function is to integrate user-written service logic codes and default components into a complete distributed computing program, which concurrently runs on a Hadoop cluster.

1.2 Advantages and disadvantages of MapReduce

  • advantages
    • MapReduce is easy to program: a distributed application can be created by simply implementing a few interfaces
    • Good scalability: When computing resources are scarce, it can be expanded by adding machines
    • High fault tolerance: When one of the machines is down, computing tasks on it can be transferred to another node so that the task will not fail
    • Suitable for offline processing of mass data above PB level
  • disadvantages
    • Poor at real-time computing: unable to return results in milliseconds or seconds
    • Not good at streaming: MapReduce input data is static
    • Not good at DAG (Directed acyclic graph) calculation: The output result of each MapReduce job is written to the disk, resulting in a large amount of DISK I/O, resulting in low performance

1.3 Core Ideas of MapReduce

  • The core ideas are analyzed as follows:
    • Distributed algorithms require at least two phases, so the first phase has a concurrent instance of MapTask running completely without any connection
    • The ReduceTask concurrent instances in the second stage are unrelated, but their data depends on the output of all MapTask concurrent instances in the previous stage
    • The MapReduce programming model can contain only one Map phase and one Reduce phase. If the user’s business logic is very complex, only multiple MapReduce programs can be executed sequentially.
  • A complete MapReduce program runs in distributed mode with three instance processes
    • MrAppMaster: responsible for the process scheduling and state coordination of the whole program
    • MapTask: Is responsible for the entire data processing process in the Map phase
    • ReduceTask: Responsible for the whole data processing process in the Reduce stage

1.4 MapReduce programming specifications

The program compiled by users is divided into three parts: Mapper, Reducer and Driver

  • Mapper stage

    • A user-defined Mapper inherits its parent class
    • Mapper input data is in the form of KV pairs (customizable)
    • The business logic of a Mapper is written in the map() method
    • Mapper output data is in the form of KV pairs (customizable)
    • The map() method (MapTask process) is called once for each
      ,v>
  • Reducer phase

    • The user-defined Reducer inherits its parent class
    • Input data type of Reducer corresponds to output data type of Mapper, which is also KV
    • Service logic of the Reducer Uninstall reduce()
    • The ReduceTask process calls the Reduce () method once for each group of the same

      group
      ,v>
  • Driver stage

  • The client of the Yarn cluster is used to submit the entire program to the Yarn cluster. The submitted job object encapsulates the operation parameters of the MapReduce program

  • The following is an example: WordCount

2 Hadoop Serialization

2.1 Overview of Serialization

  • define

    • Serialization: Converting an in-memory object into a sequence of bytes (or other data transfer protocol) for easy storage to disk (persistence) and network transfer
    • Deserialization: Converts received byte sequences (or other data transfer protocols) or persistent data from disk into objects in memory.
  • Why serialize

    • In general, “live” objects live only in memory and are lost when power is turned off. And “live” objects can only be used by local processes and cannot be sent to another computer on the network. Serialization, however, can store “live” objects, which can be sent to a remote computer.
  • Hadoop serialization features

    • Compact: Efficient use of storage space
    • Fast: Low overhead for reading and writing data
    • Extensible: Upgrade with the communication protocol upgrade
    • Interoperability: Supports multi-language interaction
  • Custom bean object implementation serialization interface (Writable)

    • The basic serialization types often used in enterprise development do not meet all requirements, such as passing a bean object inside the Hadoop framework, which needs to implement a serialization interface

3 MapReduce framework principles

3.1 InputFormat Data input

  • MapTask parallelism determination mechanism

    • Data blocks: The HDFS physically divides data into blocks
    • Data slicing: Data slicing only logically slashes input, not slices it on disk for storage
  • Source code parsing diagram of Job submission process

3.2 FileInputFormat Slicing mechanism

  • Slice mechanism
    • Simply slice the file according to the content length
    • Slice size, equal to block size by default
    • When slicing, the whole data set is not considered, but each file is sliced separately

3.3 CombineTextInputFormat Slicing mechanism

The default TextInputFormat slicing mechanism of the framework is to plan slicing tasks according to files. No matter how small the file is, it will be a separate slice and will be handed to a MapTask. In this way, if there are a large number of small files, a large number of MapTasks will be generated and the processing efficiency is extremely low.

  • Application scenario: CombineTextInputFormat is used for scenarios with too many small files and can logically plan multiple small files into a slice so that multiple small files can be handed over to a MapTask for processing
  • Maximum size of virtual storage slices: It is best to set the maximum size of virtual storage slices based on the actual size of small files
  • Slicing mechanism: includes virtual stored procedures and slicing procedures

3.4 Customizing InputFormat

  • The steps for customizing InputFormat are as follows:
    • A custom class inherits FileInputFormat
    • Rewrite RecordReader to achieve a complete file read into KV encapsulation
    • Use SequenceFileOutPutFormat to output merged files

4 MapReduce Workflow

MapReduce is the core of Hadoop, and Shuffle is the core of MapReduce.

4.1 Process Diagram

  • Detailed workflow I

  • Detailed workflow II

  • The process,

    The above two processes are the most complete MapReduce workflow. Shuffle starts at Step 7 and ends at step 16. The Shuffle process is as follows:

    • MapTask collects the KV pairs output by our map() method and puts them in the memory buffer

    • A continuous overflow of local disk files from the memory buffer, possibly multiple files

    • Multiple overflow files are merged into a large overflow file

    • During both the overflow and merge processes, the Partitioner is called for partitioning and sorting on keys

    • ReduceTask fetch corresponding result partitioning data from each MapTask machine according to its partition number

    • ReduceTask will take the result files from different MapTasks in the same partition, and then merge these files (merge sort).

    • After merging into large files, the Shuffle process ends and then enters the logical operation process of ReduceTask (extract key value pairs one by one from the file and invoke user-defined reduce() method).

  • Note:

    • The Shuffle buffer size affects the execution efficiency of the MapReduce program. In principle, the larger the buffer size is, the fewer I/O operations are performed on the disk and the higher the execution speed is.
    • The size of the buffer can be adjusted by parameter: io.sort. MB The default value is 100M.

4.2 the Shuffle mechanism

The data processing process after the Map method and before the Reduce method is called Shuffle.

4.3 Partition Partition

The default partition is obtained by modulating the number of ReduceTasks based on the hashCode of key, and users have no control over which key is stored in which partition.

  • Partition summary:
    • If the number of ReduceTask > the number of results of getPartition, several more empty output files part-R-000XX will be generated
    • If 1< the number of ReduceTask < the number of results of getPartition, then some partition data has no place to place and will be Exception
    • If the number of ReduceTask =1, no matter how many partition files MapTask side outputs, the final result will be given to this one ReduceTask, and only one result file part-R-00000 will be generated eventually
    • The partition numbers must be incremented one by one starting from 0
  • A custom Partitioner
    • Customize the Partitioner by overriding the getPartition() method
    • In the Job driver, set up the custom Partitioner
    • After you customize the Partiton, set the corresponding number of reducetasks according to the logic of the customized Partitioner

4.4 WritableComparable sorting

Sorting is one of the most important operations of the MapReduce framework. Both MapTask and ReduceTask will sort the data by key. Any application’s data will be sorted regardless of whether it is logically needed.

The default sort is lexicographical, and the way to achieve this sort is quicksort.

  • Summary of the sorting

    • For MapTask, it will be temporarily in processing the ring buffer, when ring buffer utilization reaches a certain threshold, and then to a quick sort the data in the buffer, and the orderly data to disk, and when the data processing is completed, will merge sort on all files on disk.
    • For ReduceTask, it copies corresponding data files remotely from each MapTask, overwrites to disk if the file size exceeds a certain threshold, and stores in memory otherwise. If the number of files on the disk reaches a certain threshold, merge sort is performed to generate a larger file. If the size or number of files in memory exceeds a certain threshold, data is overwritten to disk after a merge. After all data is copied, ReduceTask uniformly merges and sorts all data in memory and disk.
  • Sorting classification

    • Internal sorting: MapReduce sorts the data set by the key of the input record, ensuring that each output file is internally ordered.
    • Full sort: The final output is a single file, and the file is ordered internally. You can do this by setting only a ReduceTask. However, this method is extremely inefficient when dealing with large files.
    • Auxiliary sorting (GroupingComparator grouping) : Groups keys on the Reduce end. Application: Grouping sort can be used when you want one or more keys with the same field to enter a reduce method when receiving a bean key object.
    • Double sort: In a custom sort, if there are two criteria in compareTo, it is double sort
  • User-defined sort WritableComparable

    • Principle: When bean objects are transmitted by key, the WritableComparable interface needs to override the compareTo method.

4.5 Combiner merger

  • An overview of the
    • Combiner is a component other than Mapper and Reducer in the MR program, and its parent class is Reducer
    • The difference between Combiner and Reducer is that Combiner runs on the node where each MapTask resides, while Reducer receives the output of all Mapper globally
    • Combiner is used to partially summarize the output of each MapTask to reduce network traffic and solve network I/O problems
    • The prerequisite that Combiner can be applied is that the final service logic cannot be affected, and the output kV of Combiner should correspond to the input KV type of the Reducer
  • User-defined Combiner implementation steps
    • Define a Combiner inheriting Reducer, and rewrite Reduce method

5 The working mechanism of MapTask and ReduceTask

5.11 Working mechanism of MapTask

  • Parsing the working mechanism of MapTask

    • Read stage: MapTask parses keys/values from input InputSplit through user-written RecordReader

    • Map phase: This node mainly gives the parsed key/value to the user to write the Map () function for processing, and generates a series of new keys/values

    • Collect stage: When the user writes the map() function, after data processing is complete, outputCollector.collect () will be called to output the results. Inside the function, the generated key/value partition (calling the Partitoner) is written to an annular memory buffer

    • Spill phase: When the ring buffer is full, MapReduce writes data to the local disk and generates a temporary file. Note that before writing data to the local disk, you need to sort the data locally, and merge or compress the data if necessary

      • Write over phase details

        1: Uses the quicksort algorithm to sort data in the cache by Partition number and then by key. In this way, after sorting, data is clustered by partition, and all data in the same partition is ordered by key.

        2: Writes data in each partition in ascending order to the temporary file utput/ spilln. out (N indicates the number of current write overflows) in the working directory of the task. If Combiner is configured, data in each partition is aggregated once before writing files.

        3: Write the meta information of partition data to SpillRecord, where the meta information of each partition includes the offset in the temporary file, the size of the data before compression and the size of the data after compression. If the current memory index size exceeds 1MB, the memory index is written to the file output/ spilln.out.index.

    • Combine phase: When all data processing is complete, MapTask merges all the temporary files once to ensure that only one data file is eventually generated

    When all the data is processed, MapTask combines all the temporary files into a large file and saves the file output/file.out and generates the corresponding index file Output /file.out.index.

    During file merging, MapTask merges files on a partition basis. For a partition, it takes multiple rounds of recursive merging. Merge io.sort.factor (default 10) files in each round and add the resulting files back to the list to be merged. After sorting the files, repeat the process until you end up with a large file.

    Having each MapTask eventually generate only one data file avoids the overhead of having a large number of files open and reading a large number of small files at the same time.

5.2 ReduceTask Working mechanism

  • ReduceTask working mechanism analysis
    • Copy stage: ReduceTask Remote Copy a piece of data from each MapTask and write a piece of data to the disk if its size exceeds a certain threshold, or directly put it into the memory.
    • Merge phase: While copying data remotely, ReduceTask starts two background threads to Merge files in memory and on disk to prevent excessive memory usage or file overload on disk.
    • Sort stage: According to MapReduce semantics, users write reduce() function input data is a group of data aggregated by key. To cluster data with the same key, Hadoop uses a sort-based strategy. As each MapTask has implemented local sorting of its own processing results, ReduceTask only needs to merge and sort all data once
    • Reduce phase: The Reduce () function writes the calculation results to the HDFS
  • Set the ReduceTask parallelism
    • The concurrency of ReduceTask also affects the concurrency and execution efficiency of the whole job, but different from the concurrency of MapTask determined by the number of slices, the number of ReduceTask can be directly and manually set.
  • Pay attention to
    • ReduceTask=0, indicating that there is no Reduce phase and the number of output files is the same as that of Maps
    • The default value of ReduceTask is 1, so output a file
    • If data steps are uneven, data skew may occur in the Reduce phase
    • The number of ReduceTask is not arbitrary, and the business logic requirements should also be considered. In some cases, it is necessary to calculate the global summary result, so there can only be 1 ReduceTask. The exact number of reducetasks depends on cluster performance
    • If the number of partitions is not 1, but ReduceTask is 1, whether to perform the partitioning process. The answer is: do not perform partitioning. Because in the source code of MapTask, the premise of partitioning is to judge whether the number of ReduceNum is greater than 1. If the value is less than 1, it will not be executed

6 OutputFormat Data output

6.1 OutputFormat Interface implementation class

OutputFormat is the base class of MapReduce output. All MapReduce output implements the OutputFormat interface.

  • TextOutputFormat: The default output format is TextOutputFormat, which writes unskipped records as text lines
  • SequenceFileOutputFormat: The SequenceFileOutputFormat output is used as the input of subsequent MapReduce jobs. The format is compact and easy to compress
  • Custom OutputFormat: To control the output path and format of the final file, you can customize the OutputFormat.
    • For example, to output two types of results to different directories based on different data in a MapReduce program, you can use a customized OutputFormat to achieve flexible output requirements.
  • Customizing OutputFormat Steps:
    • A custom class inherits FileOutputFormat
    • Rewrite RecordWriter to write()

7 Summary of MapReduce Development

7.1 When writing the MapReduce program, consider the following aspects:

  • Input data interface: InputFormat
    • The default implementation class is TextInputFormat
    • TextInputFormat reads one line of text at a time and returns the starting offset of the changed line as key and the contents of the line as value
    • KeyValueTextInputFormat Each row is a row separated by key and value delimiters. The default separator is TAB (\t)
    • NlineInputFormat Divides slices based on the specified number of rows N
    • CombineTextInputFormat can combine multiple small files into a single slice to improve processing efficiency
    • You can also customize the InputFormat
  • Logical processing interface: Mapper
    • Users implement three of these methods based on business requirements: map(), setup(), and cleanup()
  • Partitoner partition
    • There is a default implementation of HashPartitioner, where the logic is to return a partition number based on the hash value of the key and numstub
    • You can customize partitions
  • Compareable sorting
    • When you output a customized object as a key, you need to implement the WritableComparable interface and override the compareTo() method
    • Partial sorting: Internal sorting of each file for final output
    • Full sort: Sorts all data, usually only one Reduce
    • Double sort: There are two conditions for sorting
  • Combiner merger
    • Combiner merge improves program execution efficiency and reduces I/O transmission. However, it must not affect the original service processing results.
  • Reduce end group: GroupingComparator
    • Group keys on the Reduce end. Application: When the received key is a bean object, group sorting can be used when one or more keys with the same field (all fields are different) are entered into the same Reduce method
  • Logical processing interface: Reducer
    • Users implement three of these methods: reduce(), setup(), and cleanup()
  • Data output interface: OutputFormat
    • The default implementation class is TextOutputFormat, and the function logic is: output a line to the target file for each KV pair
    • SequenceFileOutputFormat output is used as input for subsequent MapReduce jobs. This is a good output format because it is compact and easy to compress
    • User-defined OutputFormat

summary

This paper mainly summarizes the theory and practice of MR development, continue to come!