TableLoader tableLoader = TableLoader.fromHadoopTable("file:///tmp/iceberg2");
tableLoader.open();
Table table = tableLoader.loadTable();
RewriteDataFilesActionResult result = Actions.forTable(table)
         .rewriteDataFiles()
         .execute();
Copy the code

This is flink’s small file merge code

public class Actions {

  public static final Configuration CONFIG = new Configuration()
      // disable classloader check as Avro may cache class/object in the serializers.
      .set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);

  private StreamExecutionEnvironment env;
  private Table table;

  private Actions(StreamExecutionEnvironment env, Table table) {
    this.env = env;
    this.table = table;
  }

  public static Actions forTable(StreamExecutionEnvironment env, Table table) {
    return new Actions(env, table);
  }

  public static Actions forTable(Table table) {
    return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(CONFIG), table);
  }

  public RewriteDataFilesAction rewriteDataFiles(a) {
    return newRewriteDataFilesAction(env, table); }}Copy the code

This is an Actions class, which should correspond mainly to the structure in Spark. The constructor is private. We need to provide static method forTable to set the table parameter to instantiate the Actions class. Then call the rewriteDataFiles method to return the RewriteDataFilesAction class object, which is mainly to do small file merge function, call the execute() method to start small file merge, The execute () is the parent class RewriteDataFilesAction BaseRewriteDataFilesAction method, contains a small file to merge the main logic

public RewriteDataFilesActionResult execute(a) {
    CloseableIterable<FileScanTask> fileScanTasks = null;
    try {
      fileScanTasks = table.newScan()
          .caseSensitive(caseSensitive)
          .ignoreResiduals()
          .filter(filter)
          .planFiles();
    } finally {
      try {
        if(fileScanTasks ! =null) { fileScanTasks.close(); }}catch (IOException ioe) {
        LOG.warn("Failed to close task iterable", ioe);
      }
    }

    Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(fileScanTasks.iterator());
    Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = groupedTasks.entrySet().stream()
        .filter(kv -> kv.getValue().size() > 1)
        .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));

    // Nothing to rewrite if there's only one DataFile in each partition.
    if (filteredGroupedTasks.isEmpty()) {
      return RewriteDataFilesActionResult.empty();
    }
    // Split and combine tasks under each partition
    List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
        .map(scanTasks -> {
          CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
              CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
          return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
        })
        .flatMap(Streams::stream)
        .filter(task -> task.files().size() > 1 || isPartialFileScan(task))
        .collect(Collectors.toList());

    if (combinedScanTasks.isEmpty()) {
      return RewriteDataFilesActionResult.empty();
    }

    List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
    List<DataFile> currentDataFiles = combinedScanTasks.stream()
        .flatMap(tasks -> tasks.files().stream().map(FileScanTask::file))
        .collect(Collectors.toList());
    replaceDataFiles(currentDataFiles, addedDataFiles);

    return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles);
  }
Copy the code

The planFiles() method was analyzed in the previous Iceberg data reading article. It mainly returns an iterator of FileScanTask, which contains each data file and the DELETE files it needs to filter out. Then, according to partition group by, it is generated into combinedScanTasks list, wherein splitFiles divide files according to target file size, and planTasks are tasks that generate merged files, and then filter them according to some conditions. For example, task.files().size()>1 indicates that the task is generated only when the number of merged files is greater than one. However, if the V2 version supports DELETE,

In the above program, there are five data files in the table, so there are five fileScan tasks

There are two tasks in the generated combinedScanTasks, among which each task contains two two filesCANTasks, so there are four files to merge. The remaining single file is filtered by the condition that the number of task files is greater than 1 and the replaceDataFiles method overwrites the metadata

 public void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
    try {
      RewriteFiles rewriteFiles = table.newRewrite();
      rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
      commit(rewriteFiles);
    } catch (Exception e) {
      Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
          .noRetry()
          .suppressFailureWhenFinished()
          .onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
          .run(fileIO::deleteFile);
      throwe; }}Copy the code

RewriteFiles RewriteFiles = table.newrewrite () returns a RewriteFiles object from the table containing the table name and TableOperations information, which contains the current version of the table. Note that this is the table information at the start of the small file merge, not the current table version information at the start of rewriteFiles.

NewRewrite () returns a BaseRewriteFiles object that inherits from MergingSnapshotProducer, BaseRewriteFiles inherit from MergingSnapshotProducer and from SnapshotProducer. This is information that initializes some of the parent SnapshotProducer classes, including one important message, commitUUID

private final String commitUUID = UUID.randomUUID().toString();
Copy the code

Represents the uUID for this commit and the BaseRewriteFiles object represents the snapshot update information for this rewrite operation

Four types of files are recorded in the RewriteFiles object: 1. Data files to be deleted 2. Delete file to be deleted 3. Data file to be added 4. The delete file RewriteFiles to be added inherits from SnapshotUpdate as a Snapshot update operation. The implementation class is BaseRewriteFiles. Abstract class MergingSnapshotProducer; abstract class MergingSnapshotProducer; Abstract class MergingSnapshotProducer; abstract class MergingSnapshotProducer; abstract class MergingSnapshotProducer; abstract class MergingSnapshotProducer; abstract class MergingSnapshotProducer

@Override
  public void commit(a) {
    // this is always set to the latest commit attempt's snapshot id.
    AtomicLong newSnapshotId = new AtomicLong(-1L);
    try {
      Tasks.foreach(ops)
          .retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
          .exponentialBackoff(
              base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
              base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
              base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
              2.0 /* exponential */)
          .onlyRetryOn(CommitFailedException.class)
          .run(taskOps -> {
            Snapshot newSnapshot = apply();
            newSnapshotId.set(newSnapshot.snapshotId());
            TableMetadata updated;
            if (stageOnly) {
              updated = base.addStagedSnapshot(newSnapshot);
            } else {
              updated = base.replaceCurrentSnapshot(newSnapshot);
            }

            if (updated == base) {
              // do not commit if the metadata has not changed. for example, this may happen when setting the current
              // snapshot to an ID that is already current. note that this check uses identity.
              return;
            }

            // if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries
            // to ensure that if a concurrent operation assigns the UUID, this operation will not fail.
            taskOps.commit(base, updated.withUUID());
          });

    } catch (CommitStateUnknownException commitStateUnknownException) {
      throw commitStateUnknownException;
    } catch (RuntimeException e) {
      Exceptions.suppressAndThrow(e, this::cleanAll);
    }

    LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), getClass().getSimpleName());

    try {
      // at this point, the commit must have succeeded. after a refresh, the snapshot is loaded by
      // id in case another commit was added between this commit and the refresh.
      Snapshot saved = ops.refresh().snapshot(newSnapshotId.get());
      if(saved ! =null) {
        cleanUncommitted(Sets.newHashSet(saved.allManifests()));
        // also clean up unused manifest lists created by multiple attempts
        for (String manifestList : manifestLists) {
          if(! saved.manifestListLocation().equals(manifestList)) { deleteFile(manifestList); }}}else {
        // saved may not be present if the latest metadata couldn't be loaded due to eventual
        // consistency problems in refresh. in that case, don't clean up.
        LOG.warn("Failed to load committed snapshot, skipping manifest clean-up"); }}catch (RuntimeException e) {
      LOG.warn("Failed to load committed table metadata, skipping manifest clean-up", e);
    }

    notifyListeners();
  }
Copy the code

Call apply() to return a new Snapshot. Apply () is implemented in the SnapshotProducer class.

  public Snapshot apply(a) {
    this.base = refresh(); Long parentSnapshotId = base.currentSnapshot() ! =null ?
        base.currentSnapshot().snapshotId() : null;
    long sequenceNumber = base.nextSequenceNumber();

    // run validations from the child operation
    validate(base);

    List<ManifestFile> manifests = apply(base);

    if (base.formatVersion() > 1 || base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
      OutputFile manifestList = manifestListPath();

      try (ManifestListWriter writer = ManifestLists.write(
          ops.current().formatVersion(), manifestList, snapshotId(), parentSnapshotId, sequenceNumber)) {

        // keep track of the manifest lists created
        manifestLists.add(manifestList.location());

        ManifestFile[] manifestFiles = new ManifestFile[manifests.size()];

        Tasks.range(manifestFiles.length)
            .stopOnFailure().throwFailureWhenFinished()
            .executeWith(ThreadPools.getWorkerPool())
            .run(index ->
                manifestFiles[index] = manifestsWithMetadata.get(manifests.get(index)));

        writer.addAll(Arrays.asList(manifestFiles));

      } catch (IOException e) {
        throw new RuntimeIOException(e, "Failed to write manifest list file");
      }

      return new BaseSnapshot(ops.io(),
          sequenceNumber, snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
          manifestList.location());

    } else {
      return newBaseSnapshot(ops.io(), snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base), manifests); }}Copy the code

The refresh() method is called first, and the refresh() method in HadoopTableOperations is called last

public TableMetadata refresh(a) {
    intver = version ! =null ? version : findVersion();
    try {
      Path metadataFile = getMetadataFile(ver);
      if (version == null && metadataFile == null && ver == 0) {
        // no v0 metadata means the table doesn't exist yet
        return null;
      } else if (metadataFile == null) {
        throw new ValidationException("Metadata file for version %d is missing", ver);
      }

      Path nextMetadataFile = getMetadataFile(ver + 1);
      while(nextMetadataFile ! =null) {
        ver += 1;
        metadataFile = nextMetadataFile;
        nextMetadataFile = getMetadataFile(ver + 1);
      }

      updateVersionAndMetadata(ver, metadataFile.toString());

      this.shouldRefresh = false;
      return currentMetadata;
    } catch (IOException e) {
      throw new RuntimeIOException(e, "Failed to refresh the table"); }}Copy the code

As you can see here, the next version number is fetched, and if it exists, the loop continues, so the last version number is fetched, and the latest metadata is generated and assigned to currentMetadata and returned.

Return to the Apply () method in the SnapshotProducer class, which calls an apply() method in which the write disk logic of the ManifestFile is executed and the List object is returned. The apply() method is implemented in the MergingSnapshotProducer abstract class

public List<ManifestFile> apply(TableMetadata base) {
    Snapshot current = base.currentSnapshot();

    DataManifests () returns the latest dataManifests, and then in the Sse method, filter the dataManifests to be retainedList<ManifestFile> filtered = filterManager.filterManifests( base.schema(), current ! =null ? current.dataManifests() : null);
    // Iterate over all reserved dataManifests to get the smallest SequenceNumber
    long minDataSequenceNumber = filtered.stream()
        .map(ManifestFile::minSequenceNumber)
        .filter(seq -> seq > 0) // filter out unassigned sequence numbers in rewritten manifests
        .reduce(base.lastSequenceNumber(), Math::min);
    // Delete DeleteFiles based on the smallest SequenceNumber. This is just a method to set filter criteriadeleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber); List<ManifestFile> filteredDeletes = deleteFilterManager.filterManifests( base.schema(), current ! =null ? current.deleteManifests() : null);

    // only keep manifests that have live data files or that were written by this commit
    Predicate<ManifestFile> shouldKeep = manifest ->
        manifest.hasAddedFiles() || manifest.hasExistingFiles() || manifest.snapshotId() == snapshotId();
    Iterable<ManifestFile> unmergedManifests = Iterables.filter(
        Iterables.concat(prepareNewManifests(), filtered), shouldKeep);
    Iterable<ManifestFile> unmergedDeleteManifests = Iterables.filter(
        Iterables.concat(prepareDeleteManifests(), filteredDeletes), shouldKeep);

    // update the snapshot summary
    summaryBuilder.clear();
    summaryBuilder.merge(addedFilesSummary);
    summaryBuilder.merge(appendedManifestsSummary);
    summaryBuilder.merge(filterManager.buildSummary(filtered));
    summaryBuilder.merge(deleteFilterManager.buildSummary(filteredDeletes));

    List<ManifestFile> manifests = Lists.newArrayList();
    Iterables.addAll(manifests, mergeManager.mergeManifests(unmergedManifests));
    Iterables.addAll(manifests, deleteMergeManager.mergeManifests(unmergedDeleteManifests));

    return manifests;
  }
Copy the code

Call the filterManifests method in filterManager to filter out the data ManifestFile to be retained.

/**
   * Filter deleted files out of a list of manifests.
   *
   * @param tableSchema the current table schema
   * @param manifests a list of manifests to be filtered
   * @return an array of filtered manifests
   */
  List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manifests) {
    if (manifests == null || manifests.isEmpty()) {
      validateRequiredDeletes();
      return ImmutableList.of();
    }

    // use a common metrics evaluator for all manifests because it is bound to the table schema
    StrictMetricsEvaluator metricsEvaluator = new StrictMetricsEvaluator(tableSchema, deleteExpression);

    ManifestFile[] filtered = new ManifestFile[manifests.size()];
    // open all of the manifest files in parallel, use index to avoid reordering
    // Each ManifestFile is filtered by calling filterManifest separately, and the ManifestFile is written to disk after the corresponding file is deleted. This returns a new filterManifest object
    Tasks.range(filtered.length)
        .stopOnFailure().throwFailureWhenFinished()
        .executeWith(ThreadPools.getWorkerPool())
        .run(index -> {
          ManifestFile manifest = filterManifest(metricsEvaluator, manifests.get(index));
          filtered[index] = manifest;
        });

        validateRequiredDeletes(filtered);

    return Arrays.asList(filtered);
  }
Copy the code

DeletePaths is a member variable in the ManifestFilterManager class that contains all datafiles that rewrite needs to delete

This returns filtered re-validating files, whether all deletePaths are included in the deleted files of the new ManifestFiles

DeletePaths is a member variable in the ManifestFilterManager class that contains all datafiles that rewrite needs to delete

  /**
   * Throws a {@link ValidationException} if any deleted file was not present in a filtered manifest.
   *
   * @param manifests a set of filtered manifests
   */
  private void validateRequiredDeletes(ManifestFile... manifests) {
    if (failMissingDeletePaths) {
      Set<CharSequence> deletedFiles = deletedFiles(manifests);
      ValidationException.check(deletedFiles.containsAll(deletePaths),
          "Missing required files to delete: %s",
          COMMA.join(Iterables.filter(deletePaths, path -> !deletedFiles.contains(path))));
    }
  }

Copy the code

The filterManifest method is then called to filter the Manifest file

/ * * *@return a ManifestReader that is a filtered version of the input manifest.
   */
  private ManifestFile filterManifest(StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest) {
    ManifestFile cached = filteredManifests.get(manifest);
    if(cached ! =null) {
      return cached;
    }

    boolean hasLiveFiles = manifest.hasAddedFiles() || manifest.hasExistingFiles();
    if(! hasLiveFiles || ! canContainDeletedFiles(manifest)) { filteredManifests.put(manifest, manifest);return manifest;
    }

    try (ManifestReader<F> reader = newManifestReader(manifest)) {
      // this assumes that the manifest doesn't have files to remove and streams through the
      // manifest without copying data. if a manifest does have a file to remove, this will break
      // out of the loop and move on to filtering the manifest.
      boolean hasDeletedFiles = manifestHasDeletedFiles(metricsEvaluator, reader);
      if(! hasDeletedFiles) { filteredManifests.put(manifest, manifest);return manifest;
      }

      return filterManifestWithDeletedFiles(metricsEvaluator, manifest, reader);

    } catch (IOException e) {
      throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest); }}Copy the code

The object in the try(){} statement must be an object that implements the java.io.Closeable interface. The object is automatically closed when it is used. The ManifestReader class implements the Java.io.Closeable interface

The newManifestReader(manifest) method opens the avro file to be read, Is ultimately in the org. Apache. Iceberg. Avro. AvroIterable call DataFileReader. OpenReader method to create the file There will be a one-time create four avro documents and initialize some metadata information

The function calls manifestHasDeletedFiles to check whether there are files that need to be deleted in the Manifest file

private boolean manifestHasDeletedFiles( StrictMetricsEvaluator metricsEvaluator, ManifestReader
       
         reader)
        {
    boolean isDelete = reader.isDeleteManifestReader();
    Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec());
    Evaluator strict = strictDeleteEvaluator(reader.spec());
    boolean hasDeletedFiles = false;
    for (ManifestEntry<F> entry : reader.entries()) {
      F file = entry.file();
      boolean fileDelete = deletePaths.contains(file.path()) ||
          dropPartitions.contains(file.specId(), file.partition()) ||
          (isDelete && entry.sequenceNumber() > 0 && entry.sequenceNumber() < minSequenceNumber);
      if (fileDelete || inclusive.eval(file.partition())) {
        ValidationException.check(
            fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
            "Cannot delete file where some, but not all, rows match filter %s: %s".this.deleteExpression, file.path());

        hasDeletedFiles = true;
        if (failAnyDelete) {
          throw new DeleteException(reader.spec().partitionToPath(file.partition()));
        }
        break; // as soon as a deleted file is detected, stop scanning}}return hasDeletedFiles;
  }
Copy the code

DeletePaths is a member variable in the ManifestFilterManager class that contains all datafiles that rewrite needs to delete

Files are deleted here have three judgment conditions (1) through deletePaths. Contains judge whether the file should be deleted, deletePaths contains only the data file 2. DropPartitions. 3 the contains deleted partitions. If it is a delelte file, you need to check whether the sequenceNumber of the file is greater than 0 and smaller than minSequenceNumber. MinSequenceNumber should be a global variable in the scope of a table. Indicates the minimum SequenceNumber value of the data file that is not deleted. If the SequenceNumber value of the delelte file is smaller than this value, the DELETE file is no longer used and can be deleted

Then at the end of the filterManifest method invocation filterManifestWithDeletedFiles to rewrite the Manifest file

private ManifestFile filterManifestWithDeletedFiles( StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest, ManifestReader
       
         reader)
        {
    boolean isDelete = reader.isDeleteManifestReader();
    Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec());
    Evaluator strict = strictDeleteEvaluator(reader.spec());
    // when this point is reached, there is at least one file that will be deleted in the
    // manifest. produce a copy of the manifest with all deleted files removed.
    List<F> deletedFiles = Lists.newArrayList();
    Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();

    try {
      ManifestWriter<F> writer = newManifestWriter(reader.spec());
      try {
        reader.entries().forEach(entry -> {
          F file = entry.file();
          boolean fileDelete = deletePaths.contains(file.path()) ||
              dropPartitions.contains(file.specId(), file.partition()) ||
              (isDelete && entry.sequenceNumber() > 0 && entry.sequenceNumber() < minSequenceNumber);
          if(entry.status() ! = ManifestEntry.Status.DELETED) {if (fileDelete || inclusive.eval(file.partition())) {
              ValidationException.check(
                  fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
                  "Cannot delete file where some, but not all, rows match filter %s: %s".this.deleteExpression, file.path());

              writer.delete(entry);

              CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path());
              if (deletedPaths.contains(wrapper)) {
                LOG.warn("Deleting a duplicate path from manifest {}: {}",
                    manifest.path(), wrapper.get());
                duplicateDeleteCount += 1;
              } else {
                // only add the file to deletes if it is a new delete
                // this keeps the snapshot summary accurate for non-duplicate data
                deletedFiles.add(entry.file().copyWithoutStats());
              }
              deletedPaths.add(wrapper);

            } else{ writer.existing(entry); }}}); }finally {
        writer.close();
      }

      // return the filtered manifest as a reader
      ManifestFile filtered = writer.toManifestFile();

      // update caches
      filteredManifests.put(manifest, filtered);
      filteredManifestToDeletedFiles.put(filtered, deletedFiles);

      return filtered;

    } catch (IOException e) {
      throw new RuntimeIOException(e, "Failed to close manifest writer"); }}Copy the code

ManifestWriter = newManifestWriter(reader.spec()) Here we construct a new ManifestWriter using some information from the ManifestReader to rewrite the properties in the new Avro file

    @Override
    protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec manifestSpec) {
      return MergingSnapshotProducer.this.newManifestWriter(manifestSpec);
    }
Copy the code
  protected OutputFile newManifestOutput(a) {
    return ops.io().newOutputFile(
        ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement())));
  }

  protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec spec) {
    return ManifestFiles.write(ops.current().formatVersion(), spec, newManifestOutput(), snapshotId());
  }
Copy the code

The total newManifestOutput() file is created. The avro file is named commitUUID plus “-m” plus a serial number

Writer. delete(entry) Indicates the file attributes to be deleted

   void delete(ManifestEntry<F> entry) {
    // Use the current Snapshot ID for the delete. It is safe to delete the data file from disk
    // when this Snapshot has been removed or when there are no Snapshots older than this one.
    addEntry(reused.wrapDelete(snapshotId, entry.file()));
  }
Copy the code

Here, the data file to be deleted is added to the current ManifestWriter using the addEntry method. Here, the current snapshotId is used. Finally in the org. Apache. Iceberg. Avro. Call the add method of AvroFileAppender writer. Append write data

Return to the apply() method in the SnapshotProducer manifests manifests in all manifest files. If the newly generated sequenceNumber field is not initialized, it is set to -1. The manifest that has not been rewritten remains the original sequenceNumber ManifestListWriter, which writes the snapshot.avro file

The manifests information is assigned to the manifestFiles and then added to the Writer via writer.addall (Arrays. AsList (manifestFiles))

  @Override
  public void addAll(Iterable<ManifestFile> values) {
    values.forEach(this::add);
  }
Copy the code
  @Override
  public void add(ManifestFile manifest) {
    writer.add(prepare(manifest));
  }
Copy the code
    @Override
    protected ManifestFile prepare(ManifestFile manifest) {
      return wrapper.wrap(manifest);
    }
Copy the code

Manifestfile: manifestFile: manifestFile: manifestFile: manifestFile: manifestFile: manifestFile: manifestFile: manifestFile: manifestFile: manifestFile: manifestFile CommitSnapshotId and sequenceNumber,

    private final long commitSnapshotId;
    private final long sequenceNumber;
    private ManifestFile wrapped = null;

    IndexedManifestFile(long commitSnapshotId, long sequenceNumber) {
      this.commitSnapshotId = commitSnapshotId;
      this.sequenceNumber = sequenceNumber;
    }

    public ManifestFile wrap(ManifestFile file) {
      this.wrapped = file;
      return this;
    }
Copy the code

The writer in the add(Manifest) method is a FileAppender object, and the implementation class is AvroFileAppender, which is used to write the ManifestFile file

  @Override
  public void add(D datum) {
    try {
      numRecords += 1L;
      writer.append(datum);
    } catch (IOException e) {
      throw newRuntimeIOException(e); }}Copy the code

The Append method of AvroFileAppender is called in the Add method and then the Write (T datum, Encoder Out) method in GenericAvroWriter is called

  @Override
  public void write(T datum, Encoder out) throws IOException {
    writer.write(datum, out);
  }
Copy the code

ValueWriter is a ValueWriter object. RecordWriter is the implementation class of the StructWriter whose first class is the abstract implementation class. Write method of the StructWriter is called

    @Override
    public void write(S row, Encoder encoder) throws IOException {
      for (int i = 0; i < writers.length; i += 1) { writers[i].write(get(row, i), encoder); }}Copy the code

Then write the manifest object one by one. Get (S struct, int pos) is the RecordWriter method to obtain the value of each position in the record

    @Override
    protected Object get(IndexedRecord struct, int pos) {
      return struct.get(pos);
    }
Copy the code

Get calls the get method in IndexedManifestFile, because IndexedManifestFile implements the PUT and get methods in the IndexedRecord interface in org.apache.avro.generic. So it can be used to write avro files, and the following get method is the value of each position

@Override
    public Object get(int pos) {
      switch (pos) {
        case 0:
          return wrapped.path();
        case 1:
          return wrapped.length();
        case 2:
          return wrapped.partitionSpecId();
        case 3:
          return wrapped.content().id();
        case 4:
          if (wrapped.sequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) {
            // if the sequence number is being assigned here, then the manifest must be created by the current
            // operation. to validate this, check that the snapshot id matches the current commit
            Preconditions.checkState(commitSnapshotId == wrapped.snapshotId(),
                "Found unassigned sequence number for a manifest from snapshot: %s", wrapped.snapshotId());
            return sequenceNumber;
          } else {
            return wrapped.sequenceNumber();
          }
        case 5:
          if (wrapped.minSequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) {
            // same sanity check as above
            Preconditions.checkState(commitSnapshotId == wrapped.snapshotId(),
                "Found unassigned sequence number for a manifest from snapshot: %s", wrapped.snapshotId());
            // if the min sequence number is not determined, then there was no assigned sequence number for any file
            // written to the wrapped manifest. replace the unassigned sequence number with the one for this commit
            return sequenceNumber;
          } else {
            return wrapped.minSequenceNumber();
          }
        case 6:
          return wrapped.snapshotId();
        case 7:
          return wrapped.addedFilesCount();
        case 8:
          return wrapped.existingFilesCount();
        case 9:
          return wrapped.deletedFilesCount();
        case 10:
          return wrapped.addedRowsCount();
        case 11:
          return wrapped.existingRowsCount();
        case 12:
          return wrapped.deletedRowsCount();
        case 13:
          return wrapped.partitions();
        default:
          throw new UnsupportedOperationException("Unknown field ordinal: "+ pos); }}Copy the code

. Here you can see if wrapped minSequenceNumber () to 1, then gets the current IndexedManifestFile sequenceNumber writing object, minSequenceNumber, too

Call write to encoder

    @Override
    public void write(Integer i, Encoder encoder) throws IOException {
      encoder.writeInt(i);
    }
Copy the code

The Snapshot implementation class is BaseSnapShot, which contains the generated Snapshot avro file name, SnapshotID, SequenceNumber, Operator, and the number of files to be added or deleted

  BaseSnapshot(FileIO io,
               long sequenceNumber,
               long snapshotId,
               Long parentId,
               long timestampMillis,
               String operation,
               Map<String, String> summary,
               String manifestList) {
    this.io = io;
    this.sequenceNumber = sequenceNumber;
    this.snapshotId = snapshotId;
    this.parentId = parentId;
    this.timestampMillis = timestampMillis;
    this.operation = operation;
    this.summary = summary;
    this.manifestListLocation = manifestList;
  }
Copy the code

It then calls the base replaceCurrentSnapshot method create TableMetadata object

public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
    // there can be operations (viz. rollback, cherrypick) where an existing snapshot could be replacing current
    if (snapshotsById.containsKey(snapshot.snapshotId())) {
      return setCurrentSnapshotTo(snapshot);
    }

    ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber,
        "Cannot add snapshot with sequence number %s older than last sequence number %s",
        snapshot.sequenceNumber(), lastSequenceNumber);

    List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
        .addAll(snapshots)
        .add(snapshot)
        .build();
    List<HistoryEntry> newSnapshotLog = ImmutableList.<HistoryEntry>builder()
        .addAll(snapshotLog)
        .add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
        .build();

    return new TableMetadata(null, formatVersion, uuid, location,
        snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId,
        currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId,
        defaultSortOrderId, sortOrders, rowKey,
        properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
  }
Copy the code

The commit method of TableOperations is committed, and here is the commit implementation of HadoopTableOperations

public void commit(TableMetadata base, TableMetadata metadata) {
    Pair<Integer, TableMetadata> current = versionAndMetadata();
    if(base ! = current.second()) {throw new CommitFailedException("Cannot commit changes based on stale table metadata");
    }

    if (base == metadata) {
      LOG.info("Nothing to commit.");
      return;
    }

    Preconditions.checkArgument(base == null || base.location().equals(metadata.location()),
        "Hadoop path-based tables cannot be relocated"); Preconditions.checkArgument( ! metadata.properties().containsKey(TableProperties.WRITE_METADATA_LOCATION),"Hadoop path-based tables cannot relocate metadata");

    String codecName = metadata.property(
        TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT);
    TableMetadataParser.Codec codec = TableMetadataParser.Codec.fromName(codecName);
    String fileExtension = TableMetadataParser.getFileExtension(codec);
    Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + fileExtension);
    TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));

    intnextVersion = (current.first() ! =null ? current.first() : 0) + 1;
    Path finalMetadataFile = metadataFilePath(nextVersion, codec);
    FileSystem fs = getFileSystem(tempMetadataFile, conf);

    try {
      if (fs.exists(finalMetadataFile)) {
        throw new CommitFailedException(
            "Version %d already exists: %s", nextVersion, finalMetadataFile); }}catch (IOException e) {
      throw new RuntimeIOException(e,
          "Failed to check if next version exists: %s", finalMetadataFile);
    }

    // this rename operation is the atomic commit operation
    renameToFinal(fs, tempMetadataFile, finalMetadataFile);

    // update the best-effort version pointer
    writeVersionHint(nextVersion);

    deleteRemovedMetadataFiles(base, metadata);

    this.shouldRefresh = true;
  }
Copy the code

The method returns first

private synchronized Pair<Integer, TableMetadata> versionAndMetadata(a) {
    return Pair.of(version, currentMetadata);
  }
Copy the code

CurrentMetadata The TableMetadata object contains the current metadata. json file and other information. Version is the current version number

Create a temporary metadata file, tempMetadataFile, and write to disk by calling write in TableMetadataParser.

public static void write(TableMetadata metadata, OutputFile outputFile) {
    internalWrite(metadata, outputFile, false);
  }

  public static void internalWrite(
      TableMetadata metadata, OutputFile outputFile, boolean overwrite) {
    boolean isGzip = Codec.fromFileName(outputFile.location()) == Codec.GZIP;
    OutputStream stream = overwrite ? outputFile.createOrOverwrite() : outputFile.create();
    try (OutputStream ou = isGzip ? new GZIPOutputStream(stream) : stream;
         OutputStreamWriter writer = new OutputStreamWriter(ou, StandardCharsets.UTF_8)) {
      JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
      generator.useDefaultPrettyPrinter();
      toJson(metadata, generator);
      generator.flush();
    } catch (IOException e) {
      throw new RuntimeIOException(e, "Failed to write json to file: %s", outputFile); }}Copy the code

The main metadata-related code in toJson then generates new version numbers, official file names, renames, writes to version files, and deletes temporary files