This is the 12th day of my participation in Gwen Challenge
MapReduce
concept
1. MapReduce Principle
- (Step1) Slice ———— What determines the quantity of map
- As we know, files are stored in HDFS in the form of blocks. So why is it that a split corresponds to a map and not a block corresponds to a map?
- Split is a logical division of block blocks, because the size of the block is determined when the file is stored, and cannot be dynamically changed, which is very inflexible. So you can use split to split a block logically
- For example, if a block is too large, it will affect the calculation time. Use split to make it smaller, and then execute map in parallel, which will greatly shorten the calculation time
- A split corresponds to a map
- As we know, files are stored in HDFS in the form of blocks. So why is it that a split corresponds to a map and not a block corresponds to a map?
- (step2) key value mapping of map
- A key-value mapping of a bunch of data
- (step3)shuffle
- The process of putting together all data with the same key
- (step4)reduce distribution strategy and data processing
- Map sets the same key data as a group. A Reduce can process multiple sets of data. A set of data can be processed by only one Reduce, but cannot be separated and processed by multiple Reduce
- A complete Mapreducce process
- The memory buffer
- Function 1: Sort the data
- For example, the key is Beijing, Shanghai and Nanjing, and the value is housing price; It will first partition in the buffer, and then put different data into different partitions by key
- The data is then passed to Reduce, which can quickly perform calculations based on partitions (it is clear which Reduce processes which partitions).
- Q1: How are partitions determined?
- There are as many partitions as there are cuts
- For example, reduce1 deals with the data of Beijing, so there is only Beijing data in this partition; While reduce2 deals with the data of Shanghai and Nanjing, then the second partition contains all the data of Shanghai and Nanjing
- Question 2: How does the map know which key belongs to which partition?
- When map performs key-value mapping of data, it generates corresponding partitions based on keys.
- Sort the data a second time in memory
- Sort the data a second time
- Why do I have to sort it a second time?
- It would be convenient if a Ruduce was working with a fleet of key-value pairs, just by working directly
- If a Ruduce is dealing with multiple sets of key-value pairs with different keys, the data will be mixed in one partition, and Reduce will do a lot of disk interaction, so now the buffer will do a second order of the data in the partition
- Function 2: Data compression
- For example, to calculate the average of all housing prices, you can add up all the data directly on the map to form a single data. Instead of delegating this work to Reduce, reduce’s massive disk interactions are avoided
- Function 1: Sort the data
- How does Map send data to Reduce
- If a block is 128 MB by default, the buffer will be 128 MB after processing data each time. Map does not directly send data to Reduce, but serializes the data into small files and finally sends them to Reduce. The reason is that Reduce may be able to process, say, 1 gigabyte of files at a time
- Sort a bunch of small files, and then compose a large file (ordered) and send it to Reduce
- When multiple Maps are processed in parallel, multiple small files are formed. Finally, all small files need to be sorted to ensure that the data extracted by Reduce is in order
- In short, Reduce gets files in order
- Reduce must retrieve all the data before it begins iterative computation
- case
Second, Mr V1.0 architecture
- Architecture diagram
- JobTracker
- Core, main, single point
- Schedule all jobs
- Monitor resource load across the cluster
- TaskTracker
- Manage the resources of its own nodes
- Heartbeat with JobTracker, report resources, and get tasks
- Client
- Job as a unit
- Planning job calculation distribution
- Submit job resources to HDFS
- Finally submit assignments to JobTracker
- The disadvantages of mr1.0
- JobTracker: The load is heavy and the single point of failure occurs
- Resource management and computational scheduling are strongly coupled
- Other computing frameworks need to implement resource management repeatedly
- For example, MapReduce assigns work to a Datanode; The other computing framework cannot know that the Datanode is working and will assign work to the Datanode again, causing the framework to fail
- Resources cannot be managed globally by different frameworks
- Mr1.0 processing flow
- Count the number of times you have the word refund
Mr2.0 (YARN) architecture
- Architecture diagram
- ResourceManager
- Lord, core
- Manage cluster node resources
- Long service
- Note Resourcemanager is a single point, so it needs to cooperate with ZooKeeper to set up HA
- NodeManager
- Long service
- Runs on nodes and reports node resource status to Resourcemanager
- Manage the container lifecycle
- container
- If the memory of a Reduce task exceeds the size of a Container (1 GB by default), the task is killed
- applicationmaster
- Allocates tasks, but does not execute the tasks immediately. You must apply for resources from Resourcemanager first
- Short service, start an ApplicationMaster when a task comes in
- A single point of failure is avoided
- client
- Set up some tasks
- YARN: Yet Another Resource Negotiator
- Hadoop 2.0 is a new resource management system that is directly evolved from MRv1.
- Core idea: The resource management and task scheduling functions of JobTracker in MRv1 are separated and implemented by ResourceManager and ApplicationMaster respectively
- ResourceManager: Manages and schedules resources in a cluster
- ApplicationMaster: Responsible for application-related transactions, such as task scheduling, task monitoring, and fault tolerance
- YARN enables multiple computing frameworks to run in a cluster
- Each application corresponds to an ApplicationMaster
- At present, multiple computing frameworks can run on YARN, such as MapReduce, Spark, and Storm
- Hadoop 2.0 is a new resource management system that is directly evolved from MRv1.
Build graphs
- The official documentation
1. Installation positions of service nodes
- NM(NodeManager): it is a data management service and must be placed on the Datanode
- RS (Resourcemanage) : Play it freely, but it is better to open it separately from namenode
YARN on Single Node
- Modify the/etc/hadoop/mapred – site. XML
- Open the configuration file directory and find none
mapred-site.xml
But there is onemapred-site.xml.template
, change its name tomapred-site.xml
- Open the configuration file directory and find none
// Add the following<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
Copy the code
- Modify the/etc/hadoop/yarn – site. XML
// Add the following<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
Copy the code
- in
etc/hadoop/yarn-site.xml
Added HA configuration to the file
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<property># This ID has nothing to do with the clusterID of HDFS. It is the ID of Resourcemanager<name>yarn.resourcemanager.cluster-id</name>
<value>cluster1</value>
</property># Use these two logical names to find the Resourcemanager portal<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>Which two hosts are configured with Resourcemanager<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>master1</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>master2</value>
</property># ZooKeeper node location<property>
<name>yarn.resourcemanager.zk-address</name>
<value>node2:2181,node3:2181,node4:2181</value>
</property>
Copy the code
- Keyless login is configured between nodes 3 and 4
- Because Resourcemanager and HA are configured on them, they may switch to each other. Therefore, you need to configure SSH secure-free login
Three, start,
- Start the zookeeper
zkServer.sh start
Copy the code
- Start the cluster.
- After startup, I found an exception. Both Namenodes are standby, and the ZKFC process is not started
- Use if not enabled
hadoop-daemon.sh start zkfc
Command to start
start-dfs.sh
Copy the code
- Start the yarn
- After datanodes are started, nodeManager processes are found on several Datanodes, but Resourcemanager processes are not started
start-yarn.sh
Copy the code
- Start the resourcemanager
- Start the process on the node where you are configuring Resourcemanager, in this case node3,node4
- Run this command on all configured nodes
yarn-daemon.sh start resourcemanager
Copy the code
- Status of each node
#node1
[root@node1 ~]# jps
7426 Jps
7111 DFSZKFailoverController
6680 JournalNode
6491 NameNode
#node2
[root@node2 hadoop]# jps
6385 QuorumPeerMain
7269 Jps
6438 NameNode
6502 DataNode
7159 NodeManager
6891 DFSZKFailoverController
6590 JournalNode
#node3
[root@node3 ~]# jps
6401 QuorumPeerMain
6455 DataNode
6841 ResourceManager
6540 JournalNode
6718 NodeManager
6879 Jps
#node4
[root@node4 ~]# jps
6769 ResourceManager
6453 DataNode
6646 NodeManager
7002 Jps
6399 QuorumPeerMain
Copy the code
- Close the process
- Stop the Resourcemanager process
- Note: Execute this command on both machines (node3,node4)
- Direct use of
stop-all.sh
Close all processes - Close the zookeeper
- Stop the Resourcemanager process
4. Use a browser to access Resourcemanager
- Because Resourcemanager is configured with HA, one node is active and the other node is standby
// Directly connect to node4 node4:8088Copy the code
5. Use IDEA to build a wordcountDemo
- The source code
package com.sju.mr.wc;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.io.IOException;
public class MyWordCount {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
Job job = Job.getInstance(conf);
job.setJarByClass(MyWordCount.class);
// Set the output input
Path inPath = new Path("/user/test01/haha.txt");
FileInputFormat.addInputPath(job,inPath);
Path outPath = new Path("/output/wordcount");
// If the output path already exists, delete it first (if there is an error)
if(outPath.getFileSystem(conf).exists(outPath))
outPath.getFileSystem(conf).delete(outPath,true);
FileOutputFormat.setOutputPath(job,outPath);
//mapper does intermediate set mapping class
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
job.setReducerClass(Myreducer.class);
job.waitForCompletion(true); }} -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -package com.sju.mr.wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
import java.util.StringTokenizer;
/* * the first argument, KEYIN, represents the coordinates of the first character in each row. For example, the first character in the first row is 0 followed by 1,2,3 (similar to an array) * the second: VALUEIN, represents the entire row of data * the third: KEYOUT, one word * the fourth: VALUEOUT, a number * */
public class MyMapper extends Mapper<Object.Text.Text.IntWritable>{
private static final IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key,Text value,Context context) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());//value is a single word in the text
while(itr.hasMoreTokens()){
// Each ITR is a string that needs to be wrapped in word
// Word is TEXT
word.set(itr.nextToken());
// The value of one is 1
//word corresponds to TEXT and one corresponds to IntWritablecontext.write(word,one); }}} -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- --package com.sju.mr.wc;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
/* * The four arguments to the generic are the number of occurrences of the word * */
public class Myreducer extends Reducer<Text.IntWritable.Text.IntWritable> {
// Iterative calculation
private IntWritable result = new IntWritable();
// There are multiple values under a key
// Since the map has been sorted, the reduce process ends when one key is processed and moves on to the next key
public void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable val : values){
sum += val.get();
}
result.set(sum);
// This group of key-value pairs has been processed and outputcontext.write(key,result); }}Copy the code
- Idea how to export files as JAR packages?
- Open File > Project Structure > Artifacts
- Add an Artifact (+ > JAR > Empty)
- Enter the name of the jar (‘ WCC ‘). The name contains the generated path of the jar package.
- Add Module Output to the Wc.jar in the Output Layout (+ > Module Output or double-click the corresponding Module in the Available Elements box)
- Select the module of your project source file
- Then click OK to exit the Project Structure panel
- Finally, generate the JAR (Build > Build Artifacts… > sampleName > Build/Rebuild)
- Place the JAR package in any directory on Linux
- Run the following command
hadoop jar wc.jar com.sju.mr.wc.MyWordCount
- Finally, the output file is generated according to the configuration of the Java code
- Run the following command