This is the 16th day of my participation in the Gwen Challenge in November. Check out the details: The Last Gwen Challenge in 2021″
Data file used in the example: Click download
- Mapper
/ * * *@author jacques huang
* @date10 November 2021 */
@Log4j
public class OrderMapper extends Mapper<Object.Text.Text.OrderModel> {
private final OrderModel MODEL = new OrderModel();
private final Text KEY = new Text();
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] order = value.toString().split(",");
MODEL.setOrderId(Long.valueOf(order[0]));
MODEL.setTotalMoney(Double.parseDouble(order[1]));
MODEL.setRealMoney(Double.parseDouble(order[2]));
MODEL.setAddress(order[3]);
MODEL.setCreateTime(StringUtils.isNotBlank(order[4])? LocalDateTime.parse(order[4], DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) :null);
MODEL.setPaysTime(StringUtils.isNotBlank(order[5])? LocalDateTime.parse(order[5], DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) :null);
MODEL.setCacelMoney(Double.parseDouble(order[6]));
KEY.set(MODEL.getAddress());
log.info("The value of the model is+MODEL); context.write(KEY, MODEL); }}Copy the code
- Reduce
/ * * *@author jacques huang
* @date10 November 2021 */
public class OrderReduce extends Reducer<Text.OrderModel.Text.OrderOutModel> {
@Override
protected void reduce(Text key, Iterable<OrderModel> values, Context context) throws IOException, InterruptedException {
OrderOutModel model=new OrderOutModel();
for (OrderModel value : values) {
Double totalMoney = Optional.ofNullable(model.getTotalMoney()).orElse(0D);
Integer totalOrder = Optional.ofNullable(model.getTotalOrder()).orElse(0);
model.setTotalMoney(totalMoney+ value.getTotalMoney());
model.setTotalOrder(totalOrder+ 1); } context.write(key,model); }}Copy the code
- Model
/ * * *@author jacques huang
* @date10 November 2021 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class OrderModel implements Writable {
Serialization and reserialization are implemented by implementing Writable and overriding the write and readFields methods
private Long orderId;
private Double totalMoney;
private Double realMoney;
private String address;
private LocalDateTime createTime;
private LocalDateTime paysTime;
private Double cacelMoney;
@Override
public void write(DataOutput out) throws IOException {
/** * write processing */
out.writeLong(orderId);
out.writeDouble(totalMoney);
out.writeDouble(realMoney);
out.writeUTF(address);
if(createTime ! =null) {
out.writeUTF(createTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
} else {
out.writeUTF("");
}
if(paysTime ! =null) {
out.writeUTF(paysTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
} else {
out.writeUTF("");
}
out.writeDouble(cacelMoney);
}
@Override
public void readFields(DataInput in) throws IOException {
/** * read processing */
this.orderId = in.readLong();
this.totalMoney = in.readDouble();
this.realMoney = in.readDouble();
this.address = in.readUTF();
String createTimeStr = in.readUTF();
this.createTime = StringUtils.isNotBlank(createTimeStr)? LocalDateTime.parse(createTimeStr, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) :null;
String paysTimeStr = in.readUTF();
this.paysTime = StringUtils.isNotBlank(paysTimeStr)? LocalDateTime.parse(paysTimeStr, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) :null;;
this.cacelMoney = in.readDouble(); }}Copy the code
/ * * *@author jacques huang
* @date10 November 2021 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class OrderOutModel implements Writable {
Serialization and reserialization are implemented by implementing Writable and overriding the write and readFields methods
/** * Total amount */
private Double totalMoney;
/** ** ** /
private Integer totalOrder;
@Override
public void write(DataOutput out) throws IOException {
/** * write processing */
out.writeDouble(totalMoney);
out.writeInt(totalOrder);
}
@Override
public void readFields(DataInput in) throws IOException {
/** * read processing */
this.totalOrder=in.readInt();
this.totalMoney=in.readDouble();
}
@Override
public String toString(a) {
/**
* 继承了Writable输出会以这个toString为输出格式
*/
return "\t Total transaction amount:" + totalMoney +"\t Total number of transactions:+ totalOrder; }}Copy the code
MapReduceOrderApplication (start submitting class)
public class MapReduceOrderApplication {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
System.setProperty("hadoop.home.dir"."E: \ \ winutils - master \ \ hadoop - server \ \");
Configuration conf = new Configuration(true);
/** * lets the framework know that Windows heterogeneous platforms running on Windows require */
conf.set("mapreduce.app-submission.cross-platform"."true");
Job job = Job.getInstance(conf);
/** * specifies the jar package path */
job.setJar(F: \ \ "sourceCode \ \ hadoop \ \ graphs - order - demo \ \ target \ \ graphs - order - demo - 1.0. The jar." ");
/** * Specifies the program's startup class */
job.setJarByClass(MapReduceOrderApplication.class);
/** * Specifies the task name */
job.setJobName("example-order-demo0003");
/** * Enter data */
Path infile = new Path("/data/order");
TextInputFormat.addInputPath(job, infile);
/** * output data */
Path outfile = new Path("/result/order/demo0001");
if (outfile.getFileSystem(conf).exists(outfile)) {
outfile.getFileSystem(conf).delete(outfile, true);
}
TextOutputFormat.setOutputPath(job, outfile);
/** * mapper configuration */
job.setMapperClass(OrderMapper.class);
/** * Type configuration */
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(OrderModel.class);
/** *Reduce configuration */
job.setReducerClass(OrderReduce.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(OrderOutModel.class);
/** * Task submission */
job.waitForCompletion(true); }}Copy the code
- The output
Total amount of transactions in Shanghai: 544907.6300000018 Total number of transactions: 3353 Yunnan Province total amount of transactions: 75769.32000000007 Total number of transactions: 778 Inner Mongolia Autonomous Region total amount of transactions: 36827.0 Total number of transactions: 215 Beijing total amount of transactions: Total number of transactions: 2054 Jilin Province Total amount of transactions: 42040.92000000002 Total number of transactions: 401 Sichuan Province Total amount of transactions: 188948.11999999985 Total number of transactions: 2019 Tianjin Total amount of transactions: Total number of transactions: 1153 Ningxia Hui Autonomous Region Total amount of transactions: 4804.92 Total number of transactions: 42 Anhui Province total amount of transactions: 61378.67000000005 Total number of transactions: 609 Shandong Province total amount of transactions: Total number of transactions: 1804 Total amount of transactions in Shanxi Province: 46568.800000000025 Total number of transactions: 465 Total amount of transactions in Guangdong Province: 227855.27999999962 Total number of transactions: Total value of transactions in Guangxi Zhuang Autonomous Region: 35140.10000000001 Total number of transactions: 436 Total value of transactions in Xinjiang Uygur Autonomous Region: 10112.9 Total number of transactions: 58 Total value of transactions in Jiangsu Province: 227930.92999999895 Total number of transactions: Total amount of transactions in Jiangxi Province: 36791.65 Total number of transactions: 411 Hebei Province total amount of transactions: 106561.5599999999 Total number of transactions: 1083 Henan Province total amount of transactions: 90619.72000000003 Total number of transactions: 966 Zhejiang Province, Total value of transactions: 203126.9599999989, Total number of transactions: 2061 Hainan Province, Total value of transactions: 16828.18, Total number of transactions: 178, Hubei Province, Total value of transactions: 8581.7, Total number of transactions: 75, Hunan Province, Total value of transactions: Total number of transactions: 1099 Gansu Province, Total amount of transactions: 14294.7599999998, Total number of transactions: 167 Fujian Province, Total amount of transactions: 37075.53000000001, Total number of transactions: 489 Tibet Autonomous Region, Total value: Total number of transactions: 3 Guizhou Province, Total amount of transactions: 32274.16, Total number of transactions: 345 Liaoning Province, Total amount of transactions: 107355.9299999999993, Total number of transactions: 1187 chongqing, Total amount of transactions: 108975.65000000007, Total number of transactions: Total amount of transactions in Shaanxi Province: 59450.93000000003 Total number of transactions: 536 Total amount of transactions in Qinghai Province: 2396.2 Total number of transactions: 19 Total amount of transactions in Heilongjiang Province: 35058.29000000001 Total number of transactions: 379Copy the code