preface
In the last article, we looked at the HDFS read and write flow, HA high availability, federation, and Sequence Files schemes, so let’s briefly review the HDFS write flow
The Client invokes the Create method of the Distributed FileSystem, remotely invoking NameNode’s Create method, which does four things
-
Check that you are running properly
-
Check whether the file to be written into the HDFS exists
-
Check whether the client has the creation permission
-
Edits log this operation
The create method returns an OutputStream that needs to interact with the NameNode by calling the addBlock() method of the NameNode to know which data nodes the block needs to be written to.
Start writing data on a chuck with a 4-byte checkSum (516 bytes in total), and then write these Chucks into a larger structure package. After the package is filled with multiple Chucks, After putting the package into a queue called data Queue, you do two things
-
The packages in the data queue are transferred to datanodes according to the list returned by the addBlock() method of NameNode
-
The transmission to the DataNode is also transmitted to the ack queue
-
Create a checkSum for the data transmitted in the DataNode and compare it with the checkSum before packaging
-
If the validation succeeds, the package is removed from the ack queue; otherwise, the package is re-placed in the data queue for retransmission
The addBlock() method is used to write the new file.
Exceptions will not be re-explained, you can jump straight to the second chapter to see
1. MapReduce programming model
MapReduce is a distributed computing framework designed with a divide-and-conquer approach
If a single server cannot perform a complex or computating task, divide the task into smaller tasks, which are executed on different servers in parallel, and then summarize the results of each task
MapReduce consists of two phases, which are divided into the Map phase for cutting small tasks and the Reduce phase for summarizing small tasks, as shown in the following figure. Note that three small tasks can be executed in parallel
1.1 the Map phase
The map() function inputs key-value pairs, outputs a series of key-value pairs, and writes the output to local disk
1.2 Reduce phase
The input of the reduce() function is key-value pairs (that is, the output of the Map () function), and the output is a series of key-value pairs that are eventually written to HDFS
The general logic is pretty clear in the figure below, which will be explained after the shuffle process
2. MapReduce programming example
Word frequency statistics, statistics of an article, the number of occurrences of each word
2.1 Schematic diagram analysis
From left to right, there is a file, HDFS blocks it, and each block can also be regarded as a split (split), and then it provides a KV pair (0,Dear Bear River), why is the key 0? So the 0 here is actually an offset, and the offset varies with the size of the data in the file. In the current example, we don’t need to use it for the moment. What we need to do is to split the Dear Bear River as value and make statistics. After the statistics are finished, we start to read the Dear Car in the second line and output the same.
After the statistics of the three blocks divided into this file are completed, the same word is aggregated to the same node for statistics, and the result can be obtained
Issues that need attention
1. We can see that there are four words and four Reduce tasks in the figure above, but the number of reduce tasks is determined by the developers themselves, which is only a problem of SetReduceNum(4)
2. Why can reduce know how many words there are? Let’s talk about shuffle.
3. If you are careful, you will find that there are four keys after shufflling (Dear,1), but there should only be one key
2.2 mapper code
public class WordMap extends Mapper<LongWritable, Text, Text, IntWritable> { protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String[] words = value.toString().split(" "); Context. Write (new Text(word), new IntWritable(1)); }}}Copy the code
Here, LongWritable corresponds to Java’s Long type, and Text corresponds to String type. Because there are problems of serialization and deserialization of data from one node to another in distributed framework, Hadoop itself provides some classes with serialization function for us to use. So the key-value pair we normally see is (Long, String), but in this case it’s just (LongWritable,Text).
Then we overwrite the map() method to achieve word segmentation, and output each word as a key in the (word,1) state.
If you want to check out these API methods, you can go to the Hadoop website. I’m still using 2.7.3 here, as those of you who read the last article probably know
There are two mappers here because the first Mapper was the old Mapper, and now the new Mapper is being used. Click Method to see the map() Method you just used
2.3 Reducer code
public class WordReduce extends Reducer<Text, IntWritable, Text, IntWritable> { /* key: hello value: List(1, 1, ...) */ protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable count : values) { sum = sum + count.get(); } context.write(key, new IntWritable(sum)); // Output final result}; }Copy the code
Based on the previous 2.2, this code is no longer expanded. Instead, it sums the value and produces a sum. The key is still a word, and the output state is (word,sum).
Note: When the list in value is very large, the data size can be reduced by increasing the cluster memory or setting some sentence reading limits (custom InputFormat class, TextInputFormat is MapReduce’s default).
2.4 Main () method executed by the program
Basically, each of the main methods here is just copied and then filled in the parameters of the set method
public class WordMain { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { if (args.length ! = 2 || args == null) { System.out.println("please input Path!" ); System.exit(0); } Configuration configuration = new Configuration(); / / generates a job instance job job. = the job getInstance (configuration, WordMain. Class. GetSimpleName ()); Job.setjarbyclass (wordmain.class); / / input/output formats by job Settings / / the default input format is MR TextInputFormat, so comment out no problem. / / job setInputFormatClass (TextInputFormat. Class); //job.setOutputFormatClass(TextOutputFormat.class); / / set the input/output Path FileInputFormat. SetInputPaths (job, new Path (args [0])); FileOutputFormat.setOutputPath(job, new Path(args[1])); // Set job.setmapperClass (wordmap.class) to handle the Map/Reduce phase; job.setReducerClass(WordReduce.class); // If the kv pairs of map and Reduce outputs are of the same type, directly set the KV pairs of reduce outputs. If it's different, Need to set up the map respectively, reduce the output of the type of kv / / job setMapOutputKeyClass (. Class) / / set the final output key/value type m job setOutputKeyClass (Text. Class); job.setOutputValueClass(IntWritable.class); // Submit the job. WaitForCompletion (true); }}Copy the code
You can run it locally, in clusters, or in maven packages. The results can be viewed through YARN. Considering that you may not have time to build a cluster, there will be no texture here.
2.5 combiner
The local aggregation on the Map does not affect the final result no matter how many combiner operations are run
Note: Not all MapReduce programs are suitable for use, such as finding Average
WordCountMap and WordCountReduce code unchanged WordCountMain, add job.setCombinerClass(WordCountReduce. Class);Copy the code
There will be a large number of key-value pairs when we just go through the Mapping. Then, if they are all transmitted in the format of (Word,1), the amount of data to be transmitted will be huge. Therefore, the best solution at this time is to combine a word locally, namely combine operation, as shown in the figure, two (Dear, 1) become one (Dear,2), and two (Car,1) become (Car,2), etc
2.6 the shuffle process
The map task outputs data to a ring buffer. Each ring buffer is 100M in size. As data is read and written continuously, the ring buffer’s memory reaches 80%, causing overwrite to disk and writing these files to disk
The first is partitioning. By default, partitioning is done using keys. The MapReduce framework provides a HashPartitioner for partitioning
The kv pair of the ring buffer will need to call getPartition() before falling into disk, and we can see that it uses a rather clever method: First, we calculate the hashcode of the key, and then the number of a reduce in the module. In this case, we look at the figure above, the number of reduce is 4, then we modular 4 a number, the result will only be 4, that is, 0,1,2,3, so these four results will correspond to different buffers
All that is left is the Reduce Task to pull the data, which will be stored in memory at first, and will overflow and write to disk when there is no more space
Of course, if the setCombine operation is carried out at the beginning, it will be changed into (Dear,4). In the figure, because we illustrate with examples, in practice, each partition has many different words, and the merge operation will be carried out in the reduce operation, that is, the same keys are put together and sorted alphabetically.
Combine, Merge, and finally Reduce Task, these functions are the same, but they work in different stages to improve performance. As long as the business requirements are met, sometimes a single Map can solve the requirements, and sometimes both Map and Reduce phases are required.
The result of each Reduce task is written to a file in HDFS. After the Map task is completed, an appMaster is added to the word YARN to confirm polling and notify reduce Task to pull from the local disk. It is normal that a clear concept is formed only when more specific knowledge needs to be followed up.
2.7 Quadratic Sorting
MapReduce sorts and groups partitions based on keys. If you need to customize key types and key sorting rules, how to implement this?
public class Person implements WritableComparable<Person> { private String name; private int age; private int salary; public Person() { } public Person(String name, int age, int salary) { //super(); this.name = name; this.age = age; this.salary = salary; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public int getSalary() { return salary; } public void setSalary(int salary) { this.salary = salary; } @Override public String toString() { return this.salary + " " + this.age + " " + this.name; } // select * from time to time; Public int compareTo(Person o) {int compareResult1= this.salary - o.salary; if(compareResult1 ! = 0) { return -compareResult1; } else { return this.age - o.age; Public void write(DataOutput DataOutput) throws IOException {dataOutput.writeutf (name); dataOutput.writeutf (name); dataOutput.writeutf (name); dataOutput.writeInt(age); dataOutput.writeInt(salary); } // Use in to read the order of fields, Ensure that the order is consistent with that in the write method. Public void readFields(DataInput DataInput) throws IOException {// Read String this.name = dataInput.readUTF(); this.age = dataInput.readInt(); this.salary = dataInput.readInt(); }}Copy the code
Explain the content
2.8 Data Skew
Data skew is a common condition in data. Outliers will inevitably appear in the data and lead to data skew. These outliers can significantly slow down MapReduce execution. There are several common types of data skew:
- Data frequency skew – the amount of data in one area is much greater than in another. (Reduce tilt)
- Data size skew – Some records are much larger than average. (Map tilt)
Data skew may occur on both the Map and Reduce ends. Data skew on the map side makes processing of diverse data sets less efficient. Data skew on the Reduce side usually comes from the default MapReduce partition.
Data skew greatly lengthens the execution time of Map and Reduce tasks and consumes more memory resources for operations that need to cache data sets.
2.8.1 How to Diagnose Data Skew
- Focus on the data frequency skew in the output data from the map.
- How do I diagnose which keys in map output have data skew?
-
Add the ability to log details of map output keys to reduce methods
-
Once the existence of skew data is discovered, it is necessary to diagnose the keys that are causing the skew data. An easy way to do this is to track the maximum value of each key in your code. To reduce the amount of tracing, you can set a data volume threshold to track only those keys that are larger than the threshold and output them to the log.
-
8.2 Slowing down Data Skew on the Reduce End
-
Reduce data skew generally refers to the data frequency skew in map output data. That is, the data volume of some output keys is much larger than that of other output keys
-
How can I reduce the performance loss caused by data skew on the Reduce end?
① Sampling and range zoning
Hadoop's default partitioner is hash partitioning based on the Map output key. This is only good if the data is fairly evenly distributed. This is problematic when there is data skew. Using a partitioner requires first understanding the nature of the data. **TotalOrderPartitioner**, you can set partition boundaries by sampling the result set from the raw data. The range partitioner in the TotalOrderPartitioner can partition with preset partition boundary values. Therefore, it can also be used to correct data skew of partial keys in data.Copy the code
② Customize partitions
Another alternative to sampling and range partitioning is custom partitioning based on background knowledge of output keys. For example, if the map outputs the key's words from a book. Most of them are necessarily stopwords. The custom partition can then send this part of ellipsis to a fixed part of reduce instance. The rest is sent to the remaining Reduce instances.Copy the code
(3) Combine
Using Combine can greatly reduce the data frequency skew and data size skew. Where possible, Combine's purpose is to aggregate and simplify data. Combine is introduced in 48 kinds of technologies.Copy the code
4 Map connection and semi-connection
If the connected data set is too large to be used in the map-side connection. Consider the join optimization scheme for very large data sets described in Chapters 4 and 7.Copy the code
⑤ Custom data size skew strategy
Data size skewing on the Map or Reduce side affects the cache greatly and may even cause OutOfMemoryError exceptions. Dealing with this situation is not easy. Consider the following methods. - set the mapred. Linerecordreader. Maxlength to limit the maximum length of RecordReader to read. RecordReader is used in the TextInputFormat and KeyValueTextInputFormat classes. The default length has no upper limit. . - by org. Apache hadoop. Contrib. Utils. Join Settings cache data sets the maximum number of records. The default maximum number of cached records in Reduce is 100. - Consider using lossy data structures to compress data, such as Bloom filters.Copy the code
finally
MR is not divided into parts, it is very long, I hope you can read it patiently.
According to the sequence, Yarn is the next part to complete the process of big data.