Iceberg reads data by implementing the DataSourceReader interface in Spark’s datasource V2. Here use DataSourceReader child interface SupportsScanColumnarBatch enableBatchRead and planInputPartitions method reads

/**
 * A mix-in interface for {@link DataSourceReader}. Data source readers can implement this
 * interface to output {@link ColumnarBatch} and make the scan faster.
 */
@InterfaceStability.Evolving
public interface SupportsScanColumnarBatch extends DataSourceReader {
  @Override
  default List<InputPartition<InternalRow>> planInputPartitions() {
    throw new IllegalStateException(
      "planInputPartitions not supported by default within SupportsScanColumnarBatch.");
  }

  /**
   * Similar to {@link DataSourceReader#planInputPartitions()}, but returns columnar data
   * in batches.
   */
  List<InputPartition<ColumnarBatch>> planBatchInputPartitions();

  /**
   * Returns true if the concrete data source reader can read data in batch according to the scan
   * properties like required columns, pushes filters, etc. It's possible that the implementation
   * can only support some certain columns with certain types. Users can overwrite this method and
   * {@link #planInputPartitions()} to fallback to normal read path under some conditions.
   */
  default boolean enableBatchRead(a) {
    return true; }}Copy the code

As shown above, enableBatchRead is a method that detects whether certain features such as required columns and intermediate filters are supported. If true is returned, the planInputPartitions() method is called to read data.

public boolean enableBatchRead(a) {
    if (readUsingBatch == null) {
      booleanallParquetFileScanTasks = tasks().stream() .allMatch(combinedScanTask -> ! combinedScanTask.isDataTask() && combinedScanTask.files() .stream() .allMatch(fileScanTask -> fileScanTask.file().format().equals( FileFormat.PARQUET)));boolean hasNoDeleteFiles = tasks().stream().noneMatch(TableScanUtil::hasDeletes);

      this.readUsingBatch = batchReadsEnabled && hasNoDeleteFiles && (allOrcFileScanTasks ||
          (allParquetFileScanTasks && atLeastOneColumn && onlyPrimitives));
    }
    return readUsingBatch;
  }
Copy the code

EnableBatchRead calls Tasks () to return a list of tasks to read data from

private List<CombinedScanTask> tasks(a) {
    if (tasks == null) {
      TableScan scan = table
          .newScan()
          .caseSensitive(caseSensitive)
          .project(lazySchema());

      try (CloseableIterable<CombinedScanTask> tasksIterable = scan.planTasks()) {
        this.tasks = Lists.newArrayList(tasksIterable);
      } catch (IOException e) {
        throw new RuntimeIOException(e, "Failed to close table scan: %s", scan); }}return tasks;
  }
Copy the code

Calling TableScan’s planTasks method returns an iterator of CombinedScanTask, implemented in BaseTableScan

public CloseableIterable<CombinedScanTask> planTasks(a) { Map<String, String> options = context.options(); . CloseableIterable<FileScanTask> fileScanTasks = planFiles(); CloseableIterable<FileScanTask> splitFiles = TableScanUtil.splitFiles(fileScanTasks, splitSize);return TableScanUtil.planTasks(splitFiles, splitSize, lookback, openFileCost);
  }

Copy the code

The planFiles() method is called here to return an iterator of FileScanTask

  public CloseableIterable<FileScanTask> planFiles(a) {
      return planFiles(ops, snapshot,
          context.rowFilter(), context.ignoreResiduals(), context.caseSensitive(), context.returnColumnStats());
  }
Copy the code

This calls the overloaded planFiles() method, which uses the template design pattern and is implemented in DataTableScan, a subclass of BaseTableScan

public CloseableIterable<FileScanTask> planFiles(TableOperations ops, Snapshot snapshot,
                                                   Expression rowFilter, boolean ignoreResiduals,
                                                   boolean caseSensitive, boolean colStats) {
    ManifestGroup manifestGroup = new ManifestGroup(ops.io(), snapshot.dataManifests(), snapshot.deleteManifests())
        .caseSensitive(caseSensitive)
        .select(colStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
        .filterData(rowFilter)
        .specsById(ops.current().specsById())
        .ignoreDeleted();

    if (ignoreResiduals) {
      manifestGroup = manifestGroup.ignoreResiduals();
    }

    if (PLAN_SCANS_WITH_WORKER_POOL && snapshot.dataManifests().size() > 1) {
      manifestGroup = manifestGroup.planWith(ThreadPools.getWorkerPool());
    }

    return manifestGroup.planFiles();
  }
Copy the code

Here the cacheManifests() method is called in snapshot.datamanifests ()

public List<ManifestFile> dataManifests(a) {
    if (dataManifests == null) {
      cacheManifests();
    }
    return dataManifests;
  }
Copy the code
private void cacheManifests(a) {
    if (allManifests == null) {
      // if manifests isn't set, then the snapshotFile is set and should be read to get the list
      this.allManifests = ManifestLists.read(io.newInputFile(manifestListLocation));
    }

    if (dataManifests == null || deleteManifests == null) {
      this.dataManifests = ImmutableList.copyOf(Iterables.filter(allManifests,
          manifest -> manifest.content() == ManifestContent.DATA));
      this.deleteManifests = ImmutableList.copyOf(Iterables.filter(allManifests, manifest -> manifest.content() == ManifestContent.DELETES)); }}Copy the code

If allManifests is empty read from the file, then divide into dataManifests and deleteManifests

Avro * avro * avro * avro * avro * avro * avro * avro * avro * avro * avro * avro * avro * avro From the file name, you can see that there is a mapping between the Manifest list file and the Manifest file

From the runtime data, we can also see that there are two Manifest files corresponding to a Manifest list, m1 ending in the delete file list and m0 ending in the data file list

Return to the planFiles method above, and finally call the ManifestGroup.planFiles () method

/**
   * Returns a iterable of scan tasks. It is safe to add entries of this iterable
   * to a collection as {@link DataFile} in each {@link FileScanTask} is defensively
   * copied.
   * @return a {@link CloseableIterable} of {@link FileScanTask}
   */
  public CloseableIterable<FileScanTask> planFiles(a) {
    LoadingCache<Integer, ResidualEvaluator> residualCache = Caffeine.newBuilder().build(specId -> {
      PartitionSpec spec = specsById.get(specId);
      Expression filter = ignoreResiduals ? Expressions.alwaysTrue() : dataFilter;
      return ResidualEvaluator.of(spec, filter, caseSensitive);
    });

    DeleteFileIndex deleteFiles = deleteIndexBuilder.build();

    boolean dropStats = ManifestReader.dropStats(dataFilter, columns);
    if(! deleteFiles.isEmpty()) { select(Streams.concat(columns.stream(), ManifestReader.STATS_COLUMNS.stream()).collect(Collectors.toList())); } Iterable<CloseableIterable<FileScanTask>> tasks = entries((manifest, entries) -> {int specId = manifest.partitionSpecId();
      PartitionSpec spec = specsById.get(specId);
      String schemaString = SchemaParser.toJson(spec.schema());
      String specString = PartitionSpecParser.toJson(spec);
      ResidualEvaluator residuals = residualCache.get(specId);
      if (dropStats) {
        return CloseableIterable.transform(entries, e -> new BaseFileScanTask(
            e.file().copyWithoutStats(), deleteFiles.forEntry(e), schemaString, specString, residuals));
      } else {
        return CloseableIterable.transform(entries, e -> newBaseFileScanTask( e.file().copy(), deleteFiles.forEntry(e), schemaString, specString, residuals)); }});if(executorService ! =null) {
      return new ParallelIterable<>(tasks, executorService);
    } else {
      returnCloseableIterable.concat(tasks); }}Copy the code

CloseableIterable = CloseableIterable fileIndex deleteFiles = deleteIndexBuilder.build(); Read delete file index

 DeleteFileIndex build(a) {
      // read all of the matching delete manifests in parallel and accumulate the matching files in a queue
      Queue<ManifestEntry<DeleteFile>> deleteEntries = new ConcurrentLinkedQueue<>();
      Tasks.foreach(deleteManifestReaders())
          .stopOnFailure().throwFailureWhenFinished()
          .executeWith(executorService)
          .run(deleteFile -> {
            try (CloseableIterable<ManifestEntry<DeleteFile>> reader = deleteFile) {
              for (ManifestEntry<DeleteFile> entry : reader) {
                // copy with stats for better filtering against data file statsdeleteEntries.add(entry.copy()); }}catch (IOException e) {
              throw new RuntimeIOException(e, "Failed to close"); }});// build a map from (specId, partition) to delete file entries
      ListMultimap<Pair<Integer, StructLikeWrapper>, ManifestEntry<DeleteFile>> deleteFilesByPartition =
          Multimaps.newListMultimap(Maps.newHashMap(), Lists::newArrayList);
      for (ManifestEntry<DeleteFile> entry : deleteEntries) {
        int specId = entry.file().specId();
        StructLikeWrapper wrapper = StructLikeWrapper.forType(specsById.get(specId).partitionType())
            .set(entry.file().partition());
        deleteFilesByPartition.put(Pair.of(specId, wrapper), entry);
      }

      // sort the entries in each map value by sequence number and split into sequence numbers and delete files lists
      Map<Pair<Integer, StructLikeWrapper>, Pair<long[], DeleteFile[]>> sortedDeletesByPartition = Maps.newHashMap();
      // also, separate out equality deletes in an unpartitioned spec that should be applied globally
      long[] globalApplySeqs = null;
      DeleteFile[] globalDeletes = null;
      for (Pair<Integer, StructLikeWrapper> partition : deleteFilesByPartition.keySet()) {
        if (specsById.get(partition.first()).isUnpartitioned()) {
          Preconditions.checkState(globalDeletes == null."Detected multiple partition specs with no partitions"); List<Pair<Long, DeleteFile>> eqFilesSortedBySeq = deleteFilesByPartition.get(partition).stream() .filter(entry -> entry.file().content()  == FileContent.EQUALITY_DELETES) .map(entry ->// a delete file is indexed by the sequence number it should be applied to
                  Pair.of(entry.sequenceNumber() - 1, entry.file()))
              .sorted(Comparator.comparingLong(Pair::first))
              .collect(Collectors.toList());

          globalApplySeqs = eqFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
          globalDeletes = eqFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);

          List<Pair<Long, DeleteFile>> posFilesSortedBySeq = deleteFilesByPartition.get(partition).stream()
              .filter(entry -> entry.file().content() == FileContent.POSITION_DELETES)
              .map(entry -> Pair.of(entry.sequenceNumber(), entry.file()))
              .sorted(Comparator.comparingLong(Pair::first))
              .collect(Collectors.toList());

          long[] seqs = posFilesSortedBySeq.stream().mapToLong(Pair::first).toArray();
          DeleteFile[] files = posFilesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new);

          sortedDeletesByPartition.put(partition, Pair.of(seqs, files));

        } else {
          List<Pair<Long, DeleteFile>> filesSortedBySeq = deleteFilesByPartition.get(partition).stream()
              .map(entry -> {
                // a delete file is indexed by the sequence number it should be applied to
                long applySeq = entry.sequenceNumber() -
                    (entry.file().content() == FileContent.EQUALITY_DELETES ? 1 : 0);
                return Pair.of(applySeq, entry.file());
              })
              .sorted(Comparator.comparingLong(Pair::first))
              .collect(Collectors.toList());

          long[] seqs = filesSortedBySeq.stream().mapToLong(Pair::first).toArray();
          DeleteFile[] files = filesSortedBySeq.stream().map(Pair::second).toArray(DeleteFile[]::new); sortedDeletesByPartition.put(partition, Pair.of(seqs, files)); }}return new DeleteFileIndex(specsById, globalApplySeqs, globalDeletes, sortedDeletesByPartition);
    }
Copy the code

Here we first call deleteManifestReaders() to read the deleteManifest file

private Iterable<CloseableIterable<ManifestEntry<DeleteFile>>> deleteManifestReaders() {
      LoadingCache<Integer, ManifestEvaluator> evalCache = specsById == null ? null :
          Caffeine.newBuilder().build(specId -> {
            PartitionSpec spec = specsById.get(specId);
            return ManifestEvaluator.forPartitionFilter(
                Expressions.and(partitionFilter, Projections.inclusive(spec, caseSensitive).project(dataFilter)),
                spec, caseSensitive);
          });

      Iterable<ManifestFile> matchingManifests = evalCache == null? deleteManifests : Iterables.filter(deleteManifests, manifest -> manifest.content() == ManifestContent.DELETES && (manifest.hasAddedFiles() || manifest.hasDeletedFiles()) &&  evalCache.get(manifest.partitionSpecId()).eval(manifest));return Iterables.transform(
          matchingManifests,
          manifest ->
              ManifestFiles.readDeleteManifest(manifest, io, specsById)
                  .filterRows(dataFilter)
                  .filterPartitions(partitionFilter)
                  .caseSensitive(caseSensitive)
                  .liveEntries()
      );
    }
Copy the code

DeleteManifests namely read deleteManifests file map in the last call ManifestFiles. Return to ManifestReader readDeleteManifest read the manifest file

public static ManifestReader<DeleteFile> readDeleteManifest(ManifestFile manifest, FileIO io, Map
       
         specsById)
       ,> {
    Preconditions.checkArgument(manifest.content() == ManifestContent.DELETES,
        "Cannot read a data manifest with a DeleteManifestReader: %s", manifest);
    InputFile file = io.newInputFile(manifest.path());
    InheritableMetadata inheritableMetadata = InheritableMetadataFactory.fromManifest(manifest);
    return new ManifestReader<>(file, specsById, inheritableMetadata, FileType.DELETE_FILES);
  }
Copy the code
protected ManifestReader(InputFile file, Map<Integer, PartitionSpec> specsById, InheritableMetadata inheritableMetadata, FileType content) { this.file = file; this.inheritableMetadata = inheritableMetadata; this.content = content; try { try (AvroIterable<ManifestEntry<F>> headerReader = Avro.read(file) .project(ManifestEntry.getSchema(Types.StructType.of()).select("status")) .build()) { this.metadata = headerReader.getMetadata(); } } catch (IOException e) { throw new RuntimeIOException(e); } int specId = TableMetadata.INITIAL_SPEC_ID; String specProperty = metadata.get("partition-spec-id"); if (specProperty ! = null) { specId = Integer.parseInt(specProperty); } if (specsById ! = null) { this.spec = specsById.get(specId); } else { Schema schema = SchemaParser.fromJson(metadata.get("schema")); this.spec = PartitionSpecParser.fromJsonFields(schema, specId, metadata.get("partition-spec")); } this.fileSchema = new Schema(DataFile.getType(spec.partitionType()).fields()); }Copy the code

The logic is a bit complicated here, but ultimately returns a DeleteFileIndex object deleteFiles, where the globalDeletes field is a list of equalityDelete files, The sortedDeletesByPartition field is the positionDelete file list

As shown in the figure, the first element of the Pair in sortedDeletesByPartition is the snapshot corresponding to the delete file, and the second element is the delete file

Continue with the following steps in the planFiles method. This calls entries to the iterator of FileScanTask. The argument to entries is a method with two arguments

private <T> Iterable<CloseableIterable<T>> entries(
            BiFunction<ManifestFile, CloseableIterable<ManifestEntry<DataFile>>, CloseableIterable<T>> entryFn) {

      Iterable<ManifestFile> matchingManifests = evalCache == null ? dataManifests :
              Iterables.filter(dataManifests, manifest -> evalCache.get(manifest.partitionSpecId()).eval(manifest));

      matchingManifests = Iterables.filter(matchingManifests, manifestPredicate::test);

      return Iterables.transform(
              matchingManifests,
              manifest -> {
                ManifestReader<DataFile> reader = ManifestFiles.read(manifest, io, specsById)
                        .filterRows(dataFilter)
                        .filterPartitions(partitionFilter)
                        .caseSensitive(caseSensitive)
                        .select(columns);

                CloseableIterable<ManifestEntry<DataFile>> entries = reader.entries();

                entries = CloseableIterable.filter(entries, manifestEntryPredicate);
                return entryFn.apply(manifest, entries);
              });
  }
Copy the code

The Manifestfiles. read method is called to read the Manifest file, then the ManifestEntry iterator is obtained, and finally the BaseFileScanTask is generated by applying the parameter methods passed into entries

BaseFileScanTask(DataFile file, DeleteFile[] deletes, String schemaString, String specString,
                   ResidualEvaluator residuals) {
    this.file = file;
    this.deletes = deletes ! =null ? deletes : new DeleteFile[0];
    this.schemaString = schemaString;
    this.specString = specString;
    this.residuals = residuals;
  }
Copy the code

The file is a data file, and the DELETES are a list of DELETE files, which includes all DELETE files with an order number larger than the DATA file

The second parameter to the BaseFileScanTask, the delete file, is obtained by the deletefiles.forentry (e) method

DeleteFile[] forEntry(ManifestEntry<DataFile> entry) {
    return this.forDataFile(entry.sequenceNumber(), (DataFile)entry.file());
}

DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
    Pair<Integer, StructLikeWrapper> partition = this.partition(file.specId(), file.partition());
    Pair<long[], DeleteFile[]> partitionDeletes = (Pair)this.sortedDeletesByPartition.get(partition);
    Stream matchingDeletes;
    if (partitionDeletes == null) {
        matchingDeletes = limitBySequenceNumber(sequenceNumber, this.globalSeqs, this.globalDeletes);
    } else if (this.globalDeletes == null) {
        matchingDeletes = limitBySequenceNumber(sequenceNumber, (long[])partitionDeletes.first(), (DeleteFile[])partitionDeletes.second());
    } else {
        matchingDeletes = Stream.concat(limitBySequenceNumber(sequenceNumber, this.globalSeqs, this.globalDeletes), limitBySequenceNumber(sequenceNumber, (long[])partitionDeletes.first(), (DeleteFile[])partitionDeletes.second()));
    }

    return (DeleteFile[])matchingDeletes.filter((deleteFile) -> {
        return canContainDeletesForFile(file, deleteFile, ((PartitionSpec)this.specsById.get(file.specId())).schema());
    }).toArray((x$0) - > {return new DeleteFile[x$0];
    });
}
Copy the code

After enableBatchRead is run, false….. is returned Of course, data is still read through the planInputPartitions method

/**
   * This is called in the Spark Driver when data is to be materialized into {@link InternalRow}
   */
  @Override
  public List<InputPartition<InternalRow>> planInputPartitions() {
    String tableSchemaString = SchemaParser.toJson(table.schema());
    String expectedSchemaString = SchemaParser.toJson(lazySchema());
    String nameMappingString = table.properties().get(DEFAULT_NAME_MAPPING);

    List<InputPartition<InternalRow>> readTasks = Lists.newArrayList();
    for (CombinedScanTask task : tasks()) {
      readTasks.add(new ReadTask<>(
          task, tableSchemaString, expectedSchemaString, nameMappingString, io, encryptionManager, caseSensitive,
          localityPreferred, InternalRowReaderFactory.INSTANCE));
    }

    return readTasks;
  }
Copy the code

This method is executed on the driver side, again calling Tasks () to generate the CombinedScanTask task, and then generating ReadTask, an internal class of Reader that inherits spark’s InputPartition interface


/**
 * An input partition returned by {@linkDataSourceReader#planInputPartitions()} and is * responsible for creating the actual data reader of one RDD partition. *  The relationship between {@link InputPartition} and {@link InputPartitionReader}
 * is similar to the relationship between {@link Iterable} and {@link java.util.Iterator}.
 *
 * Note that {@link InputPartition}s will be serialized and sent to executors, then
 * {@link InputPartitionReader}s will be created on executors to do the actual reading. So
 * {@link InputPartition} must be serializable while {@link InputPartitionReader} doesn't need to
 * be.
 */
@InterfaceStability.Evolving
public interface InputPartition<T> extends Serializable {

  /** * The preferred locations where the input partition reader returned by this partition can run * faster, but Spark does not guarantee to run the input partition reader on these locations. * The implementations should make sure that it can be run on any location. * The location is a string representing the host name. * * Note that if a host name cannot be recognized by Spark, it will be ignored as it was not in * the returned locations. The default return value is empty string array, which means this * input partition's reader has no location preference. * * If this method fails (by throwing an exception), the action will fail and no Spark job will be * submitted. */
  default String[] preferredLocations() {
    return new String[0];
  }

  /** * Returns an input partition reader to do the actual reading work. * * If this method fails (by throwing an exception), the corresponding Spark task would fail and * get retried until hitting the maximum retry times. */
  InputPartitionReader<T> createPartitionReader(a);
}
Copy the code

The relationship between InputPartition and InputPartitionReader is a bit like that between Iterable and Iterator, where Iterator interfaces include iterators like Next The Iterable interface contains iterator methods that return an Iterator interface. When a collection such as ArrayList implements Iterable, it implements the iterator interface internally through an inner class. Then the implementation of the Iterator method returns the implementation class of the Iterator interface

InputPartition is also implemented inside Reader via ReadTask, And the method that implements InputPartition createPartitionReader returns the implementation class of the InputPartitionReader interface and in this case the RowReader object that is returned implements the InputPartitionReader interface, It contains concrete iterators and methods to read data. Next and GET methods in InputPartitionReader interface are implemented in its parent class BaseDataReader

The next method in BaseDataReader is called when the data is read

public boolean next(a) throws IOException {
      while (true) {
        if (currentIterator.hasNext()) {
          this.current = currentIterator.next();
          return true;
        } else if (tasks.hasNext()) {
          this.currentIterator.close();
          this.currentTask = tasks.next();
          this.currentIterator = open(currentTask);
        } else {
          this.currentIterator.close();
          return false; }}}}Copy the code

The open method is called to generate the currentIterator iterator. Implementation in the RowDataReader, returns a CloseableIterator object

CloseableIterator<InternalRow> open(FileScanTask task) {
    SparkDeleteFilter deletes = new SparkDeleteFilter(task, tableSchema, expectedSchema);

    // schema or rows returned by readersSchema requiredSchema = deletes.requiredSchema(); Map<Integer, ? > idToConstant = PartitionUtil.constantsMap(task, RowDataReader::convertConstant); DataFile file = task.file();// update the current file for Spark's filename() function
    InputFileBlockHolder.set(file.path().toString(), task.start(), task.length());

    return deletes.filter(open(task, requiredSchema, idToConstant)).iterator();
  }
Copy the code

The delst.filter method is called here to filter iterators such as newParquetIterable for specific files returned by Open (Task, requiredSchema, idToConstant)

public CloseableIterable<T> filter(CloseableIterable<T> records) {
    return applyEqDeletes(applyPosDeletes(records));
  }
Copy the code

The filter method is supported by applyPosDeletes for position-delete, and then applyEqDeletes for equality delete

private CloseableIterable<T> applyPosDeletes(CloseableIterable<T> records) {
    if (posDeletes.isEmpty()) {
      return records;
    }

    List<CloseableIterable<Record>> deletes = Lists.transform(posDeletes, this::openPosDeletes);

    // if there are fewer deletes than a reasonable number to keep in memory, use a set
    if (posDeletes.stream().mapToLong(DeleteFile::recordCount).sum() < setFilterThreshold) {
      return Deletes.filter(
          records, this::pos,
          Deletes.toPositionSet(dataFile.path(), CloseableIterable.concat(deletes)));
    }

    return Deletes.streamingFilter(records, this::pos, Deletes.deletePositions(dataFile.path(), deletes));
  }
Copy the code

Deletes. DeletePositions returns an iterator of deletePositions

public static <T extends StructLike> CloseableIterable<Long> deletePositions(CharSequence dataLocation, List
       
        > deleteFiles)
        {
    DataFileFilter<T> locationFilter = new DataFileFilter<>(dataLocation);
    List<CloseableIterable<Long>> positions = Lists.transform(deleteFiles, deletes ->
        CloseableIterable.transform(locationFilter.filter(deletes), row -> (Long) POSITION_ACCESSOR.get(row)));

    return new SortedMerge<>(Long::compare, positions);
  }
Copy the code

SortedMerge created a deletePositions heap, and then through the poll method iteration And in return achieved CloseableIterable interface PositionStreamDeleteFilter object, The PositionFilterIterator class implements the FilterIterator interface.

protected boolean shouldKeep(T row) {
  long currentPos = extractPos.apply(row);
  if (currentPos < nextDeletePos) {
    return true;
  }
  // consume delete positions until the next is past the current position
  booleankeep = currentPos ! = nextDeletePos;while (deletePosIterator.hasNext() && nextDeletePos <= currentPos) {
    this.nextDeletePos = deletePosIterator.next();
    if (keep && currentPos == nextDeletePos) {
      // if any delete position matches the current position, discard
      keep = false; }}return keep;
}

Copy the code

ShouldKeep is used to filter the sorted DELETE iterator. The data in PosDeleteFile is only the filename and the line number of the data to be deleted