Problem description

The same SQL query COS is many times slower than HDFS

Problem analysis

  1. Test SQL statement:

SELECT count(1) FROM test where date=’20180720′;

2. Compare two jobs

(1) Query the number of HDFS Mapper



(2) Query mapper number of COS



After comparison, it is found that the mapper number of COS job is very small.

Source code analysis

CombineFileInputFormat creates split

void createSplits(Map<String, Set<OneBlockInfo>> nodeToBlocks,
                   Map<OneBlockInfo, String[]> blockToNodes,
                   Map<String, List<OneBlockInfo>> rackToBlocks,
                   long totLength,
                   long maxSize,
                   long minSizeNode,
                   long minSizeRack,
                   List<InputSplit> splits                    
                  ) {
  ArrayList<OneBlockInfo> validBlocks = new ArrayList<OneBlockInfo>();
  long curSplitSize = 0;
   
  int totalNodes = nodeToBlocks.size();
  long totalLength = totLength;
 
  Multiset<String> splitsPerNode = HashMultiset.create();
  Set<String> completedNodes = new HashSet<String>();
   
  while(true) {
    //1. -------------------- 先按照 node 处理 nodeToBlocks  --------------------
    //注意:如果数据在 HDFS 上面,那么 node 就是 文件所有 blks 在的 datanodes 会比较多,nodeToBlocks 就是 dn -> blks
    //而数据如果在 COS 上,node 是 localhost !!!!!, 所以  nodeToBlocks 是 localhost -> 所有 blks.
    //而 createSplits 方法是先按照 node 处理 nodeToBlocks,所有 blks 都集中到了 localhost,没有被打散,因此大概率生成一些大的 split
    //相反,数据在 hdfs 上,blks 会被打算到多个 node,大概率生成一些 小的 splits,所有存在 splits 个数的显著差异
    for (Iterator<Map.Entry<String, Set<OneBlockInfo>>> iter = nodeToBlocks
        .entrySet().iterator(); iter.hasNext();) {
      //一个 node 节点
      Map.Entry<String, Set<OneBlockInfo>> one = iter.next();
       
      String node = one.getKey();
       
      // Skip the node if it has previously been marked as completed.
      if (completedNodes.contains(node)) {
        continue;
      }
 
      //一个 node 上的所有 blks
      Set<OneBlockInfo> blocksInCurrentNode = one.getValue();
 
      //(1)处理 该 node 上的所有 blks, 处理完一批,就会删掉它们
      // for each block, copy it into validBlocks. Delete it from
      // blockToNodes so that the same block does not appear in two different splits.
      Iterator<OneBlockInfo> oneBlockIter = blocksInCurrentNode.iterator();
      while (oneBlockIter.hasNext()) {
        OneBlockInfo oneblock = oneBlockIter.next();
         
        // Remove all blocks which may already have been assigned to other splits.
        //注意:先是按照 nodeToBlocks 依次处理每一个 node,会存在一个 blk 对应多个 node 的情况,所以会出现重复,
        //处理完一个 blk 会从 blockToNodes 中移除它,所以 blockToNodes 中没有 oneblock,说明已经处理过了,直接 skip
        if(!blockToNodes.containsKey(oneblock)) {
          oneBlockIter.remove();
          continue;
        }
 
        //都加入到 validBlocks
        validBlocks.add(oneblock);
 
        //从 blk 角度,处理完一个 blk, 就从 blockToNodes 移除它 !!!
        blockToNodes.remove(oneblock);
 
        curSplitSize += oneblock.length;
 
        //如果 大于 maxSize=256M,那么切割一次分片
        // if the accumulated split size exceeds the maximum, then create this split.
        if (maxSize != 0 && curSplitSize >= maxSize) {
          // create an input split and add it to the splits array
          addCreatedSplit(splits, Collections.singleton(node), validBlocks);
          totalLength -= curSplitSize;
          curSplitSize = 0;
 
          splitsPerNode.add(node);
 
          // Remove entries from blocksInNode so that we don't walk these
          // again.
          blocksInCurrentNode.removeAll(validBlocks);
          validBlocks.clear();
 
          // Done creating a single split for this node. Move on to the next
          // node so that splits are distributed across nodes.
          break;
        }
      }
 
 
      //(2)该 node 的 blks 总大小不足 maxSplit 或者 split 后还剩余了一些
      if (validBlocks.size() != 0) {
        // This implies that the last few blocks (or all in case maxSize=0)
        // were not part of a split. The node is complete.
         
        // if there were any blocks left over and their combined size is
        // larger than minSplitNode, then combine them into one split.
        // Otherwise add them back to the unprocessed pool. It is likely
        // that they will be combined with other blocks from the
        // same rack later on.
        // This condition also kicks in when max split size is not set. All
        // blocks on a node will be grouped together into a single split.
        if (minSizeNode != 0 && curSplitSize >= minSizeNode && splitsPerNode.count(node) == 0) {
          // haven't created any split on this machine. so its ok to add a
          // smaller one for parallelism. Otherwise group it in the rack for
          // balanced size create an input split and add it to the splits array
          //如果该 node 没有 split,并且 curSplitSize 比 minSizeNode 大,那么这些 blks 放一起作为一个 split
          //注意:此时这个 split 的大小不足 256M !!!!
          addCreatedSplit(splits, Collections.singleton(node), validBlocks);
          totalLength -= curSplitSize;
 
          //记录一下该 node 有 split 了
          splitsPerNode.add(node);
 
          //从 blocksInCurrentNode 移除 split 完的 blks
          // Remove entries from blocksInNode so that we don't walk this again.
          blocksInCurrentNode.removeAll(validBlocks);
          // The node is done. This was the last set of blocks for this node.
        } else {
          // Put the unplaced blocks back into the pool for later rack-allocation.
          for (OneBlockInfo oneblock : validBlocks) {
            blockToNodes.put(oneblock, oneblock.hosts);
          }
        }
        validBlocks.clear();
        curSplitSize = 0;
 
        //记录一下该 node 处理完了
        completedNodes.add(node);
      } else { // No in-flight blocks.
        if (blocksInCurrentNode.size() == 0) {
          // Node is done. All blocks were fit into node-local splits.
          completedNodes.add(node);
        } // else Run through the node again.
      }
    }
 
 
    //所有 node 都处理完了,或者 所有 blks 都处理完了
    //注意:如果是 COS ,只有一个 localhost 的 node,此处 completedNodes.size() == totalNodes 是 true,而 totalLength == 0 是 false, 说明有些 blks 没有处理
    // Check if node-local assignments are complete.
    if (completedNodes.size() == totalNodes || totalLength == 0) {
      // All nodes have been walked over and marked as completed or all blocks
      // have been assigned. The rest should be handled via rackLock assignment.
      LOG.info("DEBUG: Terminated node allocation with : CompletedNodes: "
          + completedNodes.size() + ", size left: " + totalLength);
      break;
    }
  }
 
 
  //2. --------------------  未被处理到的 blks,在这里继续被处理, 这回按照 rackToBlocks 去遍历 --------------------
  //如果没有做机架划分,只会有一个 /default-rack -> {ArrayList@10687}  size = 512
  // if blocks in a rack are below the specified minimum size, then keep them
  // in 'overflow'. After the processing of all racks is complete, these
  // overflow blocks will be combined into splits.
  ArrayList<OneBlockInfo> overflowBlocks = new ArrayList<OneBlockInfo>();
  Set<String> racks = new HashSet<String>();
 
  //这一回,要把所有未处理的 blks in blockToNodes 全部处理掉(能产生 split 的就每个 rack 产生一个,如果不能 那么全部放入 overflowBlocks)
  // Process all racks over and over again until there is no more work to do.
  while (blockToNodes.size() > 0) {
 
    // Create one split for this rack before moving over to the next rack.
    // Come back to this rack after creating a single split for each of the
    // remaining racks.
    // Process one rack location at a time, Combine all possible blocks that
    // reside on this rack as one split. (constrained by minimum and maximum
    // split size). iterate over all racks
    //按照机架 遍历 blks
    for (Iterator<Map.Entry<String, List<OneBlockInfo>>> iter =
         rackToBlocks.entrySet().iterator(); iter.hasNext();) {
 
      // /default-rack -> {ArrayList@10687}  size = 512
      Map.Entry<String, List<OneBlockInfo>> one = iter.next();
      //记录机架
      racks.add(one.getKey());
 
      //该机架对应的所有 blks
      List<OneBlockInfo> blocks = one.getValue();
 
      // for each block, copy it into validBlocks. Delete it from
      // blockToNodes so that the same block does not appear in
      // two different splits.
      boolean createdSplit = false;
      for (OneBlockInfo oneblock : blocks) {
        //blockToNodes 保存是未被处理的 blks !!!
        if (blockToNodes.containsKey(oneblock)) {
          validBlocks.add(oneblock);
 
          //处理后,从 blockToNodes 移除
          blockToNodes.remove(oneblock);
 
          curSplitSize += oneblock.length;
 
          //判断未处理的这些 blks,够不够 256M,如果够那么划出一个 split
          // if the accumulated split size exceeds the maximum, then create this split.
          if (maxSize != 0 && curSplitSize >= maxSize) {
            // create an input split and add it to the splits array
            addCreatedSplit(splits, getHosts(racks), validBlocks);
 
            createdSplit = true;
 
            //此处最多创建一个 split !
            break;
          }
        }
      }
 
      //如果该 机架创建了一个 split ,那么 contine 处理下一个机架
      // if we created a split, then just go to the next rack
      if (createdSplit) {
        curSplitSize = 0;
        validBlocks.clear();
        racks.clear();
        continue;
      }
 
      //如果没有创建出来 >= 256M 的 split,就用这些 blks 创建一个小 split
      if (!validBlocks.isEmpty()) {
        if (minSizeRack != 0 && curSplitSize >= minSizeRack) {
          // if there is a minimum size specified, then create a single split
          // otherwise, store these blocks into overflow data structure
          addCreatedSplit(splits, getHosts(racks), validBlocks);
        } else {
          //如果 这些 blks 总大小实在太小了,连一个 rack 要求的 minSizeRack 都不足,那么放到 overflowBlocks 里,最后在合并到一起创建 split
          //目的是不创建过小的 split
          // There were a few blocks in this rack that remained to be processed.
          // Keep them in 'overflow' block list.
       // These will be combined later.
          overflowBlocks.addAll(validBlocks);
        }
      }
 
      curSplitSize = 0;
      validBlocks.clear();
      racks.clear();
    }
  }
 
 
  //3.  -------------------- 经过 步骤 2,只有 overflowBlocks 可能存在 未处理的 blks,集中处理掉它们 --------------------
  assert blockToNodes.isEmpty();
  assert curSplitSize == 0;
  assert validBlocks.isEmpty();
  assert racks.isEmpty();
 
  //处理剩下的,还未处理的 blks
  // Process all overflow blocks
  for (OneBlockInfo oneblock : overflowBlocks) {
    validBlocks.add(oneblock);
    curSplitSize += oneblock.length;
 
    //记录 oneblock 的 racks
    // This might cause an exiting rack location to be re-added, but it should be ok.
    for (int i = 0; i < oneblock.racks.length; i++) {
      racks.add(oneblock.racks[i]);
    }
 
    //如果凑够了 256M ,那么创建一个 split
    // if the accumulated split size exceeds the maximum, then create this split.
    if (maxSize != 0 && curSplitSize >= maxSize) {
      // create an input split and add it to the splits array
      addCreatedSplit(splits, getHosts(racks), validBlocks);
      curSplitSize = 0;
      validBlocks.clear();
      racks.clear();
    }
  }
 
 
  //4. --------------------  最后,如果前面 3 步处理完,还有 blks 未处理掉,那么全部放到一起,生成一个 < 256M 的 split  --------------------
  // Process any remaining blocks, if any.
  if (!validBlocks.isEmpty()) {
    addCreatedSplit(splits, getHosts(racks), validBlocks);
  }
}
Copy the code

Conclusion:

1. If the data is in HDFS, node is the datanodes of all the BLKS in the file. Therefore, there are more nodes and nodeToBlocks save DN -> BLKS, which is equivalent to breaking BLKS.

2. If the data is on COS, node is localhost, so nodeToBlocks is localhost -> all BLKS are grouped under one node. CreateSplits first processes nodeToBlocks as nodes. Since all BLKS are aggregated to localhost and not split, there is a good chance that some large splits will be generated.

3. When the data is stored in HDFS, the BLKS will be split to multiple nodes, which will generate some small Monos with significant difference in the number of monos. This method takes 4 rounds of processing to create as large a split as possible. The principle is to create as many “big splits” as possible (default maxSize=256M, split >= 256M as possible), and if there are not enough, just put them together and create as many as you can. That’s why our mapred.max.split.size was reduced to give us some other splits.

The solution

The small mapred. Max. The split. The size valueCopy the code