This is the 16th day of my participation in the Gwen Challenge in November. Check out the details: The Last Gwen Challenge in 2021″

Data file used in the example: Click download

  • Mapper
/ * * *@author jacques huang
 * @date10 November 2021 */
@Log4j
public class OrderMapper extends Mapper<Object.Text.Text.OrderModel> {
    private final OrderModel MODEL = new OrderModel();
    private final Text KEY = new Text();

    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {

            String[] order = value.toString().split(",");
            MODEL.setOrderId(Long.valueOf(order[0]));
            MODEL.setTotalMoney(Double.parseDouble(order[1]));
            MODEL.setRealMoney(Double.parseDouble(order[2]));
            MODEL.setAddress(order[3]);
            MODEL.setCreateTime(StringUtils.isNotBlank(order[4])? LocalDateTime.parse(order[4], DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) :null);
            MODEL.setPaysTime(StringUtils.isNotBlank(order[5])? LocalDateTime.parse(order[5], DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) :null);
            MODEL.setCacelMoney(Double.parseDouble(order[6]));
            KEY.set(MODEL.getAddress());
            log.info("The value of the model is+MODEL); context.write(KEY, MODEL); }}Copy the code
  • Reduce
/ * * *@author jacques huang
 * @date10 November 2021 */
public class OrderReduce  extends Reducer<Text.OrderModel.Text.OrderOutModel> {
    @Override
    protected void reduce(Text key, Iterable<OrderModel> values, Context context) throws IOException, InterruptedException {
        OrderOutModel model=new OrderOutModel();
        for (OrderModel value : values) {
            Double totalMoney = Optional.ofNullable(model.getTotalMoney()).orElse(0D);
            Integer totalOrder = Optional.ofNullable(model.getTotalOrder()).orElse(0);
            model.setTotalMoney(totalMoney+ value.getTotalMoney());
            model.setTotalOrder(totalOrder+ 1); } context.write(key,model); }}Copy the code
  • Model
/ * * *@author jacques huang
 * @date10 November 2021 */
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class OrderModel implements Writable {
Serialization and reserialization are implemented by implementing Writable and overriding the write and readFields methods
    private Long orderId;
    private Double totalMoney;
    private Double realMoney;
    private String address;
    private LocalDateTime createTime;
    private LocalDateTime paysTime;
    private Double cacelMoney;

    @Override
    public void write(DataOutput out) throws IOException {
    
	/** * write processing */
        out.writeLong(orderId);
        out.writeDouble(totalMoney);
        out.writeDouble(realMoney);
        out.writeUTF(address);
        if(createTime ! =null) {
            out.writeUTF(createTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        } else {
            out.writeUTF("");
        }
        if(paysTime ! =null) {
            out.writeUTF(paysTime.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
        } else {
            out.writeUTF("");
        }
        out.writeDouble(cacelMoney);
    }
    
    @Override
    public void readFields(DataInput in) throws IOException {
        /** * read processing */
        this.orderId = in.readLong();
        this.totalMoney = in.readDouble();
        this.realMoney = in.readDouble();
        this.address = in.readUTF();
         String  createTimeStr = in.readUTF();
        this.createTime = StringUtils.isNotBlank(createTimeStr)? LocalDateTime.parse(createTimeStr, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) :null;
        String  paysTimeStr = in.readUTF();
        this.paysTime = StringUtils.isNotBlank(paysTimeStr)? LocalDateTime.parse(paysTimeStr, DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")) :null;;
        this.cacelMoney = in.readDouble(); }}Copy the code
/ * * *@author jacques huang
 * @date10 November 2021 */
@Data
@AllArgsConstructor
@NoArgsConstructor
@Builder
public class OrderOutModel implements Writable {
Serialization and reserialization are implemented by implementing Writable and overriding the write and readFields methods
    /** * Total amount */
    private  Double totalMoney;
    /** ** ** /
    private  Integer totalOrder;
    @Override
    public void write(DataOutput out) throws IOException {
        /** * write processing */
        out.writeDouble(totalMoney);
        out.writeInt(totalOrder);
    }

    @Override
    public void readFields(DataInput in) throws IOException {
        /** * read processing */
        this.totalOrder=in.readInt();
        this.totalMoney=in.readDouble();
    }

    @Override
    public String toString(a) {
        /**
         * 继承了Writable输出会以这个toString为输出格式
         */
        return "\t Total transaction amount:" + totalMoney +"\t Total number of transactions:+ totalOrder; }}Copy the code

MapReduceOrderApplication (start submitting class)

public class MapReduceOrderApplication {
      public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
          System.setProperty("hadoop.home.dir"."E: \ \ winutils - master \ \ hadoop - server \ \");
          Configuration conf = new Configuration(true);
          /** * lets the framework know that Windows heterogeneous platforms running on Windows require */
          conf.set("mapreduce.app-submission.cross-platform"."true");

          Job job = Job.getInstance(conf);

          /** * specifies the jar package path */
          job.setJar(F: \ \ "sourceCode \ \ hadoop \ \ graphs - order - demo \ \ target \ \ graphs - order - demo - 1.0. The jar." ");
          /** * Specifies the program's startup class */
          job.setJarByClass(MapReduceOrderApplication.class);
          /** * Specifies the task name */
          job.setJobName("example-order-demo0003");
          /** * Enter data */
          Path infile = new Path("/data/order");
          TextInputFormat.addInputPath(job, infile);

          /** * output data */
          Path outfile = new Path("/result/order/demo0001");
          if (outfile.getFileSystem(conf).exists(outfile)) {
              outfile.getFileSystem(conf).delete(outfile, true);
          }
          TextOutputFormat.setOutputPath(job, outfile);
          /** * mapper configuration */
          job.setMapperClass(OrderMapper.class);
          /** * Type configuration */
          job.setMapOutputKeyClass(Text.class);
          job.setMapOutputValueClass(OrderModel.class);
          /** *Reduce configuration */
          job.setReducerClass(OrderReduce.class);
          job.setOutputKeyClass(Text.class);
          job.setOutputValueClass(OrderOutModel.class);
          /** * Task submission */
          job.waitForCompletion(true); }}Copy the code
  • The output
Total amount of transactions in Shanghai: 544907.6300000018 Total number of transactions: 3353 Yunnan Province total amount of transactions: 75769.32000000007 Total number of transactions: 778 Inner Mongolia Autonomous Region total amount of transactions: 36827.0 Total number of transactions: 215 Beijing total amount of transactions: Total number of transactions: 2054 Jilin Province Total amount of transactions: 42040.92000000002 Total number of transactions: 401 Sichuan Province Total amount of transactions: 188948.11999999985 Total number of transactions: 2019 Tianjin Total amount of transactions: Total number of transactions: 1153 Ningxia Hui Autonomous Region Total amount of transactions: 4804.92 Total number of transactions: 42 Anhui Province total amount of transactions: 61378.67000000005 Total number of transactions: 609 Shandong Province total amount of transactions: Total number of transactions: 1804 Total amount of transactions in Shanxi Province: 46568.800000000025 Total number of transactions: 465 Total amount of transactions in Guangdong Province: 227855.27999999962 Total number of transactions: Total value of transactions in Guangxi Zhuang Autonomous Region: 35140.10000000001 Total number of transactions: 436 Total value of transactions in Xinjiang Uygur Autonomous Region: 10112.9 Total number of transactions: 58 Total value of transactions in Jiangsu Province: 227930.92999999895 Total number of transactions: Total amount of transactions in Jiangxi Province: 36791.65 Total number of transactions: 411 Hebei Province total amount of transactions: 106561.5599999999 Total number of transactions: 1083 Henan Province total amount of transactions: 90619.72000000003 Total number of transactions: 966 Zhejiang Province, Total value of transactions: 203126.9599999989, Total number of transactions: 2061 Hainan Province, Total value of transactions: 16828.18, Total number of transactions: 178, Hubei Province, Total value of transactions: 8581.7, Total number of transactions: 75, Hunan Province, Total value of transactions: Total number of transactions: 1099 Gansu Province, Total amount of transactions: 14294.7599999998, Total number of transactions: 167 Fujian Province, Total amount of transactions: 37075.53000000001, Total number of transactions: 489 Tibet Autonomous Region, Total value: Total number of transactions: 3 Guizhou Province, Total amount of transactions: 32274.16, Total number of transactions: 345 Liaoning Province, Total amount of transactions: 107355.9299999999993, Total number of transactions: 1187 chongqing, Total amount of transactions: 108975.65000000007, Total number of transactions: Total amount of transactions in Shaanxi Province: 59450.93000000003 Total number of transactions: 536 Total amount of transactions in Qinghai Province: 2396.2 Total number of transactions: 19 Total amount of transactions in Heilongjiang Province: 35058.29000000001 Total number of transactions: 379Copy the code