A list,

Shuffle describes the process of sending data from a MapTask to a Reduce task. Shuffle is a bridge between Map and Reduce. Map output requires Shuffle, which must be performed in Reduce. Shuffle performance directly affects the performance of the entire program.

Based on previous studies, shuffle consists of two parts: data preparation in the Map phase and data copy processing in the Reduce phase.

Shuffle mechanism

1. Map first submits KV data to memory, which is called buffer.

2. The default size of the buffer is 100M. When the memory of the buffer exceeds 80%, write overwrite occurs.

3. Data overflows into different partitions, and files need to be sorted separately (quicksort, index only) within each partition. This overflow process happens more than once. (Map end)

4. The sorted data is stored in a temporary file and then merged and sorted. The data in different partitions is merged and sorted and compressed (the data is too large) and written to disk.

5. The ordered data on the Reduce end is grouped and then processed by the Reduce method.

Partition Partition

The collection thread calculates the partition number as it puts the data into the buffer.

// Set the number of reduce tasks job.setNumreducetasks (2);Copy the code

Partitioning concept: Output data to multiple files based on conditions.

Why can partitioning be achieved by setting the number of Reduce?

NewOutputCollector(JobContext jobContext, JobConf job, TaskUmbilicalProtocol umbilical, TaskReporter reporter) throws IOException, ClassNotFoundException { this.collector = MapTask.this.createSortingCollector(job, reporter); this.partitions = jobContext.getNumReduceTasks(); If (this.partitions > 1) {// If the number of reduces is greater than 1, an attempt is made to retrieve a partition class, By graphs. The job. The partitioner. Class parameter acquisition / / the default graphs. The job. The partitioner. No configuration class, Direct return HashPartitioner. Class. This partitioner = (partitioner) ReflectionUtils. NewInstance (jobContext. GetPartitionerClass (), job); } else { this.partitioner = new Partitioner<K, V>() { public int getPartition(K key, V value, int numPartitions) { return NewOutputCollector.this.partitions - 1; }}; }}Copy the code

How is the partition data divided?

Hadoop has a default partition object, HashPartitioner, which will mod the number of Reduce operations based on the hash value of K to get the partition corresponding to K. Hadoop also supports user – defined partitions.

public class HashPartitioner<K, V> extends Partitioner<K, V> { /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }}Copy the code

Through this, we find that the default partition is obtained by modulating the number of ReduceTasks according to the hashcode of key. Users have no control over which KE is stored on which partition. (Custom partitions can be used)

4. Customize partitions

Requirement: output the mobile phone number to different files according to classification. For example, 136, 137, 138, 139, and five others.

On the basis of the previous driver, you can add a partition class and set partitions and adjust Reduce tasks in the driver

/** * Custom partitioner inherits the Partioner class and overwrites the getPartition method ** partitioner<key,value> * key: specifies the type of key to write to mapper output * value: Public class PhoneNumPartitioner extends Partitioner<Text,FlowBean> {/** * partition rules: @override public int getPartition(Text Text, FlowBean, FlowBean, int numPartitions) { String phoneNum=text.toString(); int partition; if(phoneNum.startsWith("136")){ partition=0; }else if(phoneNum.startsWith("137")){ partition=1; }else if(phoneNum.startsWith("138")){ partition=2; }else if(phoneNum.startsWith("139")){ partition=3; }else{ partition=4; } return partition; }}Copy the code
public class FlowCountDriver { public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException { args = new String[] { "D:/cs/writable/input", "D:/cs/writable/out10" }; Get Configuration information or job object Configuration entries = new Configuration(); Job job =Job.getInstance(entries); Job.setjarbyclass (flowCountDriver.class); // 3 Set the map and reduce classes job.setmapperClass (flowCountmapper.class); job.setReducerClass(FlowCountReducer.class); // 4 Set the map output job.setMapOutputKeyClass(text.class); job.setMapOutputValueClass(FlowBean.class); // 5 Set the final output type job.setOutputKeyClass(text.class); job.setOutputValueClass(FlowBean.class); / / 8 job. Specify a custom data partition setPartitionerClass (PhoneNumPartitioner. Class); // 9 Specify the corresponding number of reduce task job.setNumreducetasks (5); / / 6 set the input and output Path FileInputFormat. SetInputPaths (job, new Path (args [0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); boolean result = job.waitForCompletion(true); System.exit(result ? 0:1); }}Copy the code

[Note] : The number of Reduce is set according to service requirements.

  • If this parameter is not set, the default reduce number is 1, and the final partition number is fixed at 0.
  • If 1 is less than the number of Reduces is less than the number of partitions, an error message is displayed
  • If no error is reported if the number of Reduce is greater than the number of partitions, the extra Reduce runs empty once.