This article has participated in the weekend learning program, click the link to see more details: juejin.cn/post/696572…
Hadoop’s MapReduce framework solves traffic statistics cases: The format of a text file to be collected is as follows:
1363157985066 13726238888 00-fD-07-A4-72-b8 :CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157993055 13560436666 C4-17-FE-BA-de-D9 :CMCC 120.196.100.99 18 15 1116 954 200Copy the code
The first column is the timestamp, the second column is the phone number, the second column is the downstream traffic, and the third column is the upstream traffic. You need to use mobile phones as keys to collect statistics on the total upstream traffic, total downstream traffic, and total traffic data used by each mobile phone.
The project structure is as follows:
-flowCount
-FlowBean.java
-FlowCountMapper.java
-FlowCountReducer.java
-JobSubmitter.java
Copy the code
Flowbean.java: Construct flowCountmapper.java with upstream, downstream, and total flow properties: Flowcountreducer. Java: collects statistics on the traffic usage of each mobile phone number. Jobsubmitter. Java: submits tasks
Flowbean.java:
package flowCount; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable; public class FlowBean implements Writable{ private int upFlow; private int dFlow; private int amountFlow; private String phone; public FlowBean() {} public FlowBean(String phone, int upFlow, int dFlow) { this.phone = phone; this.upFlow = upFlow; this.dFlow = dFlow; this.amountFlow = upFlow + dFlow; } public int getUpFlow() { return upFlow; } public void setUpFlow(int upFlow) { this.upFlow = upFlow; } public int getdFlow() { return dFlow; } public void setdFlow(int dFlow) { this.dFlow = dFlow; } public int getAmountFlow() { return amountFlow; } public void setAmountFlow(int amountFlow) { this.amountFlow = amountFlow; } public void write(DataOutput out) throws IOException { // TODO Auto-generated method stub out.writeInt(upFlow); out.writeUTF(phone); out.writeInt(dFlow); out.writeInt(amountFlow); } public void readFields(DataInput in) throws IOException { // TODO Auto-generated method stub this.upFlow = in.readInt(); this.phone = in.readUTF(); this.dFlow = in.readInt(); this.amountFlow = in.readInt(); } public String getPhone() { return phone; } public void setPhone(String phone) { this.phone = phone; } @Override public String toString() { return this.upFlow + "," + this.dFlow + "," + this.amountFlow; }}Copy the code
FlowCountMapper.java
package flowCount; import java.io.IOException; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Mapper.Context; public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> { protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String line = value.toString(); String[] fields = line.split("\t"); String phone = fields[1]; int upFlow = Integer.parseInt(fields[fields.length - 3]); int dFlow = Integer.parseInt(fields[fields.length - 2]); context.write(new Text(phone), new FlowBean(phone, upFlow, dFlow)); }}Copy the code
FlowCountReducer.java
package flowCount; import java.io.IOException; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Reducer; public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean>{ @Override protected void reduce(Text key, Iterable<FlowBean> values, Reducer<Text, FlowBean, Text, FlowBean>.Context context) throws IOException, InterruptedException { int upSum = 0; int dSum = 0; for(FlowBean value:values){ upSum += value.getUpFlow(); dSum += value.getdFlow(); } context.write(key, new FlowBean(key.toString(), upSum, dSum)); }}Copy the code
JobSubmitter.java
package flowCount; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; public class JobSubmitter { public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); Job job = Job.getInstance(conf); job.setJarByClass(JobSubmitter.class); job.setMapperClass(FlowCountMapper.class); job.setReducerClass(FlowCountReducer.class); job.setNumReduceTasks(3); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); FileInputFormat.setInputPaths(job, new Path("/mrdata/flow/input")); FileOutputFormat.setOutputPath(job, new Path("/mrdata/flow/output")); job.waitForCompletion(true); }}Copy the code
Running steps
- Submit a text file whose statistics are to be collected to the HDFS
Hadoop fs -put Local file path HDFS Absolute pathCopy the code
Jar. 3. Run the following command
hadoop jar flowCount.jar flowCount.JobSubmitter
Copy the code