TableLoader tableLoader = TableLoader.fromHadoopTable("file:///tmp/iceberg2");
tableLoader.open();
Table table = tableLoader.loadTable();
RewriteDataFilesActionResult result = Actions.forTable(table)
.rewriteDataFiles()
.execute();
Copy the code
This is flink’s small file merge code
public class Actions {
public static final Configuration CONFIG = new Configuration()
// disable classloader check as Avro may cache class/object in the serializers.
.set(CoreOptions.CHECK_LEAKED_CLASSLOADER, false);
private StreamExecutionEnvironment env;
private Table table;
private Actions(StreamExecutionEnvironment env, Table table) {
this.env = env;
this.table = table;
}
public static Actions forTable(StreamExecutionEnvironment env, Table table) {
return new Actions(env, table);
}
public static Actions forTable(Table table) {
return new Actions(StreamExecutionEnvironment.getExecutionEnvironment(CONFIG), table);
}
public RewriteDataFilesAction rewriteDataFiles(a) {
return newRewriteDataFilesAction(env, table); }}Copy the code
This is an Actions class, which should correspond mainly to the structure in Spark. The constructor is private. We need to provide static method forTable to set the table parameter to instantiate the Actions class. Then call the rewriteDataFiles method to return the RewriteDataFilesAction class object, which is mainly to do small file merge function, call the execute() method to start small file merge, The execute () is the parent class RewriteDataFilesAction BaseRewriteDataFilesAction method, contains a small file to merge the main logic
public RewriteDataFilesActionResult execute(a) {
CloseableIterable<FileScanTask> fileScanTasks = null;
try {
fileScanTasks = table.newScan()
.caseSensitive(caseSensitive)
.ignoreResiduals()
.filter(filter)
.planFiles();
} finally {
try {
if(fileScanTasks ! =null) { fileScanTasks.close(); }}catch (IOException ioe) {
LOG.warn("Failed to close task iterable", ioe);
}
}
Map<StructLikeWrapper, Collection<FileScanTask>> groupedTasks = groupTasksByPartition(fileScanTasks.iterator());
Map<StructLikeWrapper, Collection<FileScanTask>> filteredGroupedTasks = groupedTasks.entrySet().stream()
.filter(kv -> kv.getValue().size() > 1)
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
// Nothing to rewrite if there's only one DataFile in each partition.
if (filteredGroupedTasks.isEmpty()) {
return RewriteDataFilesActionResult.empty();
}
// Split and combine tasks under each partition
List<CombinedScanTask> combinedScanTasks = filteredGroupedTasks.values().stream()
.map(scanTasks -> {
CloseableIterable<FileScanTask> splitTasks = TableScanUtil.splitFiles(
CloseableIterable.withNoopClose(scanTasks), targetSizeInBytes);
return TableScanUtil.planTasks(splitTasks, targetSizeInBytes, splitLookback, splitOpenFileCost);
})
.flatMap(Streams::stream)
.filter(task -> task.files().size() > 1 || isPartialFileScan(task))
.collect(Collectors.toList());
if (combinedScanTasks.isEmpty()) {
return RewriteDataFilesActionResult.empty();
}
List<DataFile> addedDataFiles = rewriteDataForTasks(combinedScanTasks);
List<DataFile> currentDataFiles = combinedScanTasks.stream()
.flatMap(tasks -> tasks.files().stream().map(FileScanTask::file))
.collect(Collectors.toList());
replaceDataFiles(currentDataFiles, addedDataFiles);
return new RewriteDataFilesActionResult(currentDataFiles, addedDataFiles);
}
Copy the code
The planFiles() method was analyzed in the previous Iceberg data reading article. It mainly returns an iterator of FileScanTask, which contains each data file and the DELETE files it needs to filter out. Then, according to partition group by, it is generated into combinedScanTasks list, wherein splitFiles divide files according to target file size, and planTasks are tasks that generate merged files, and then filter them according to some conditions. For example, task.files().size()>1 indicates that the task is generated only when the number of merged files is greater than one. However, if the V2 version supports DELETE,
In the above program, there are five data files in the table, so there are five fileScan tasks
There are two tasks in the generated combinedScanTasks, among which each task contains two two filesCANTasks, so there are four files to merge. The remaining single file is filtered by the condition that the number of task files is greater than 1 and the replaceDataFiles method overwrites the metadata
public void replaceDataFiles(Iterable<DataFile> deletedDataFiles, Iterable<DataFile> addedDataFiles) {
try {
RewriteFiles rewriteFiles = table.newRewrite();
rewriteFiles.rewriteFiles(Sets.newHashSet(deletedDataFiles), Sets.newHashSet(addedDataFiles));
commit(rewriteFiles);
} catch (Exception e) {
Tasks.foreach(Iterables.transform(addedDataFiles, f -> f.path().toString()))
.noRetry()
.suppressFailureWhenFinished()
.onFailure((location, exc) -> LOG.warn("Failed to delete: {}", location, exc))
.run(fileIO::deleteFile);
throwe; }}Copy the code
RewriteFiles RewriteFiles = table.newrewrite () returns a RewriteFiles object from the table containing the table name and TableOperations information, which contains the current version of the table. Note that this is the table information at the start of the small file merge, not the current table version information at the start of rewriteFiles.
NewRewrite () returns a BaseRewriteFiles object that inherits from MergingSnapshotProducer, BaseRewriteFiles inherit from MergingSnapshotProducer and from SnapshotProducer. This is information that initializes some of the parent SnapshotProducer classes, including one important message, commitUUID
private final String commitUUID = UUID.randomUUID().toString();
Copy the code
Represents the uUID for this commit and the BaseRewriteFiles object represents the snapshot update information for this rewrite operation
Four types of files are recorded in the RewriteFiles object: 1. Data files to be deleted 2. Delete file to be deleted 3. Data file to be added 4. The delete file RewriteFiles to be added inherits from SnapshotUpdate as a Snapshot update operation. The implementation class is BaseRewriteFiles. Abstract class MergingSnapshotProducer; abstract class MergingSnapshotProducer; Abstract class MergingSnapshotProducer; abstract class MergingSnapshotProducer; abstract class MergingSnapshotProducer; abstract class MergingSnapshotProducer; abstract class MergingSnapshotProducer
@Override
public void commit(a) {
// this is always set to the latest commit attempt's snapshot id.
AtomicLong newSnapshotId = new AtomicLong(-1L);
try {
Tasks.foreach(ops)
.retry(base.propertyAsInt(COMMIT_NUM_RETRIES, COMMIT_NUM_RETRIES_DEFAULT))
.exponentialBackoff(
base.propertyAsInt(COMMIT_MIN_RETRY_WAIT_MS, COMMIT_MIN_RETRY_WAIT_MS_DEFAULT),
base.propertyAsInt(COMMIT_MAX_RETRY_WAIT_MS, COMMIT_MAX_RETRY_WAIT_MS_DEFAULT),
base.propertyAsInt(COMMIT_TOTAL_RETRY_TIME_MS, COMMIT_TOTAL_RETRY_TIME_MS_DEFAULT),
2.0 /* exponential */)
.onlyRetryOn(CommitFailedException.class)
.run(taskOps -> {
Snapshot newSnapshot = apply();
newSnapshotId.set(newSnapshot.snapshotId());
TableMetadata updated;
if (stageOnly) {
updated = base.addStagedSnapshot(newSnapshot);
} else {
updated = base.replaceCurrentSnapshot(newSnapshot);
}
if (updated == base) {
// do not commit if the metadata has not changed. for example, this may happen when setting the current
// snapshot to an ID that is already current. note that this check uses identity.
return;
}
// if the table UUID is missing, add it here. the UUID will be re-created each time this operation retries
// to ensure that if a concurrent operation assigns the UUID, this operation will not fail.
taskOps.commit(base, updated.withUUID());
});
} catch (CommitStateUnknownException commitStateUnknownException) {
throw commitStateUnknownException;
} catch (RuntimeException e) {
Exceptions.suppressAndThrow(e, this::cleanAll);
}
LOG.info("Committed snapshot {} ({})", newSnapshotId.get(), getClass().getSimpleName());
try {
// at this point, the commit must have succeeded. after a refresh, the snapshot is loaded by
// id in case another commit was added between this commit and the refresh.
Snapshot saved = ops.refresh().snapshot(newSnapshotId.get());
if(saved ! =null) {
cleanUncommitted(Sets.newHashSet(saved.allManifests()));
// also clean up unused manifest lists created by multiple attempts
for (String manifestList : manifestLists) {
if(! saved.manifestListLocation().equals(manifestList)) { deleteFile(manifestList); }}}else {
// saved may not be present if the latest metadata couldn't be loaded due to eventual
// consistency problems in refresh. in that case, don't clean up.
LOG.warn("Failed to load committed snapshot, skipping manifest clean-up"); }}catch (RuntimeException e) {
LOG.warn("Failed to load committed table metadata, skipping manifest clean-up", e);
}
notifyListeners();
}
Copy the code
Call apply() to return a new Snapshot. Apply () is implemented in the SnapshotProducer class.
public Snapshot apply(a) {
this.base = refresh(); Long parentSnapshotId = base.currentSnapshot() ! =null ?
base.currentSnapshot().snapshotId() : null;
long sequenceNumber = base.nextSequenceNumber();
// run validations from the child operation
validate(base);
List<ManifestFile> manifests = apply(base);
if (base.formatVersion() > 1 || base.propertyAsBoolean(MANIFEST_LISTS_ENABLED, MANIFEST_LISTS_ENABLED_DEFAULT)) {
OutputFile manifestList = manifestListPath();
try (ManifestListWriter writer = ManifestLists.write(
ops.current().formatVersion(), manifestList, snapshotId(), parentSnapshotId, sequenceNumber)) {
// keep track of the manifest lists created
manifestLists.add(manifestList.location());
ManifestFile[] manifestFiles = new ManifestFile[manifests.size()];
Tasks.range(manifestFiles.length)
.stopOnFailure().throwFailureWhenFinished()
.executeWith(ThreadPools.getWorkerPool())
.run(index ->
manifestFiles[index] = manifestsWithMetadata.get(manifests.get(index)));
writer.addAll(Arrays.asList(manifestFiles));
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to write manifest list file");
}
return new BaseSnapshot(ops.io(),
sequenceNumber, snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base),
manifestList.location());
} else {
return newBaseSnapshot(ops.io(), snapshotId(), parentSnapshotId, System.currentTimeMillis(), operation(), summary(base), manifests); }}Copy the code
The refresh() method is called first, and the refresh() method in HadoopTableOperations is called last
public TableMetadata refresh(a) {
intver = version ! =null ? version : findVersion();
try {
Path metadataFile = getMetadataFile(ver);
if (version == null && metadataFile == null && ver == 0) {
// no v0 metadata means the table doesn't exist yet
return null;
} else if (metadataFile == null) {
throw new ValidationException("Metadata file for version %d is missing", ver);
}
Path nextMetadataFile = getMetadataFile(ver + 1);
while(nextMetadataFile ! =null) {
ver += 1;
metadataFile = nextMetadataFile;
nextMetadataFile = getMetadataFile(ver + 1);
}
updateVersionAndMetadata(ver, metadataFile.toString());
this.shouldRefresh = false;
return currentMetadata;
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to refresh the table"); }}Copy the code
As you can see here, the next version number is fetched, and if it exists, the loop continues, so the last version number is fetched, and the latest metadata is generated and assigned to currentMetadata and returned.
Return to the Apply () method in the SnapshotProducer class, which calls an apply() method in which the write disk logic of the ManifestFile is executed and the List object is returned. The apply() method is implemented in the MergingSnapshotProducer abstract class
public List<ManifestFile> apply(TableMetadata base) {
Snapshot current = base.currentSnapshot();
DataManifests () returns the latest dataManifests, and then in the Sse method, filter the dataManifests to be retainedList<ManifestFile> filtered = filterManager.filterManifests( base.schema(), current ! =null ? current.dataManifests() : null);
// Iterate over all reserved dataManifests to get the smallest SequenceNumber
long minDataSequenceNumber = filtered.stream()
.map(ManifestFile::minSequenceNumber)
.filter(seq -> seq > 0) // filter out unassigned sequence numbers in rewritten manifests
.reduce(base.lastSequenceNumber(), Math::min);
// Delete DeleteFiles based on the smallest SequenceNumber. This is just a method to set filter criteriadeleteFilterManager.dropDeleteFilesOlderThan(minDataSequenceNumber); List<ManifestFile> filteredDeletes = deleteFilterManager.filterManifests( base.schema(), current ! =null ? current.deleteManifests() : null);
// only keep manifests that have live data files or that were written by this commit
Predicate<ManifestFile> shouldKeep = manifest ->
manifest.hasAddedFiles() || manifest.hasExistingFiles() || manifest.snapshotId() == snapshotId();
Iterable<ManifestFile> unmergedManifests = Iterables.filter(
Iterables.concat(prepareNewManifests(), filtered), shouldKeep);
Iterable<ManifestFile> unmergedDeleteManifests = Iterables.filter(
Iterables.concat(prepareDeleteManifests(), filteredDeletes), shouldKeep);
// update the snapshot summary
summaryBuilder.clear();
summaryBuilder.merge(addedFilesSummary);
summaryBuilder.merge(appendedManifestsSummary);
summaryBuilder.merge(filterManager.buildSummary(filtered));
summaryBuilder.merge(deleteFilterManager.buildSummary(filteredDeletes));
List<ManifestFile> manifests = Lists.newArrayList();
Iterables.addAll(manifests, mergeManager.mergeManifests(unmergedManifests));
Iterables.addAll(manifests, deleteMergeManager.mergeManifests(unmergedDeleteManifests));
return manifests;
}
Copy the code
Call the filterManifests method in filterManager to filter out the data ManifestFile to be retained.
/**
* Filter deleted files out of a list of manifests.
*
* @param tableSchema the current table schema
* @param manifests a list of manifests to be filtered
* @return an array of filtered manifests
*/
List<ManifestFile> filterManifests(Schema tableSchema, List<ManifestFile> manifests) {
if (manifests == null || manifests.isEmpty()) {
validateRequiredDeletes();
return ImmutableList.of();
}
// use a common metrics evaluator for all manifests because it is bound to the table schema
StrictMetricsEvaluator metricsEvaluator = new StrictMetricsEvaluator(tableSchema, deleteExpression);
ManifestFile[] filtered = new ManifestFile[manifests.size()];
// open all of the manifest files in parallel, use index to avoid reordering
// Each ManifestFile is filtered by calling filterManifest separately, and the ManifestFile is written to disk after the corresponding file is deleted. This returns a new filterManifest object
Tasks.range(filtered.length)
.stopOnFailure().throwFailureWhenFinished()
.executeWith(ThreadPools.getWorkerPool())
.run(index -> {
ManifestFile manifest = filterManifest(metricsEvaluator, manifests.get(index));
filtered[index] = manifest;
});
validateRequiredDeletes(filtered);
return Arrays.asList(filtered);
}
Copy the code
DeletePaths is a member variable in the ManifestFilterManager class that contains all datafiles that rewrite needs to delete
This returns filtered re-validating files, whether all deletePaths are included in the deleted files of the new ManifestFiles
DeletePaths is a member variable in the ManifestFilterManager class that contains all datafiles that rewrite needs to delete
/**
* Throws a {@link ValidationException} if any deleted file was not present in a filtered manifest.
*
* @param manifests a set of filtered manifests
*/
private void validateRequiredDeletes(ManifestFile... manifests) {
if (failMissingDeletePaths) {
Set<CharSequence> deletedFiles = deletedFiles(manifests);
ValidationException.check(deletedFiles.containsAll(deletePaths),
"Missing required files to delete: %s",
COMMA.join(Iterables.filter(deletePaths, path -> !deletedFiles.contains(path))));
}
}
Copy the code
The filterManifest method is then called to filter the Manifest file
/ * * *@return a ManifestReader that is a filtered version of the input manifest.
*/
private ManifestFile filterManifest(StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest) {
ManifestFile cached = filteredManifests.get(manifest);
if(cached ! =null) {
return cached;
}
boolean hasLiveFiles = manifest.hasAddedFiles() || manifest.hasExistingFiles();
if(! hasLiveFiles || ! canContainDeletedFiles(manifest)) { filteredManifests.put(manifest, manifest);return manifest;
}
try (ManifestReader<F> reader = newManifestReader(manifest)) {
// this assumes that the manifest doesn't have files to remove and streams through the
// manifest without copying data. if a manifest does have a file to remove, this will break
// out of the loop and move on to filtering the manifest.
boolean hasDeletedFiles = manifestHasDeletedFiles(metricsEvaluator, reader);
if(! hasDeletedFiles) { filteredManifests.put(manifest, manifest);return manifest;
}
return filterManifestWithDeletedFiles(metricsEvaluator, manifest, reader);
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest: %s", manifest); }}Copy the code
The object in the try(){} statement must be an object that implements the java.io.Closeable interface. The object is automatically closed when it is used. The ManifestReader class implements the Java.io.Closeable interface
The newManifestReader(manifest) method opens the avro file to be read, Is ultimately in the org. Apache. Iceberg. Avro. AvroIterable call DataFileReader. OpenReader method to create the file There will be a one-time create four avro documents and initialize some metadata information
The function calls manifestHasDeletedFiles to check whether there are files that need to be deleted in the Manifest file
private boolean manifestHasDeletedFiles( StrictMetricsEvaluator metricsEvaluator, ManifestReader
reader)
{
boolean isDelete = reader.isDeleteManifestReader();
Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec());
Evaluator strict = strictDeleteEvaluator(reader.spec());
boolean hasDeletedFiles = false;
for (ManifestEntry<F> entry : reader.entries()) {
F file = entry.file();
boolean fileDelete = deletePaths.contains(file.path()) ||
dropPartitions.contains(file.specId(), file.partition()) ||
(isDelete && entry.sequenceNumber() > 0 && entry.sequenceNumber() < minSequenceNumber);
if (fileDelete || inclusive.eval(file.partition())) {
ValidationException.check(
fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
"Cannot delete file where some, but not all, rows match filter %s: %s".this.deleteExpression, file.path());
hasDeletedFiles = true;
if (failAnyDelete) {
throw new DeleteException(reader.spec().partitionToPath(file.partition()));
}
break; // as soon as a deleted file is detected, stop scanning}}return hasDeletedFiles;
}
Copy the code
DeletePaths is a member variable in the ManifestFilterManager class that contains all datafiles that rewrite needs to delete
Files are deleted here have three judgment conditions (1) through deletePaths. Contains judge whether the file should be deleted, deletePaths contains only the data file 2. DropPartitions. 3 the contains deleted partitions. If it is a delelte file, you need to check whether the sequenceNumber of the file is greater than 0 and smaller than minSequenceNumber. MinSequenceNumber should be a global variable in the scope of a table. Indicates the minimum SequenceNumber value of the data file that is not deleted. If the SequenceNumber value of the delelte file is smaller than this value, the DELETE file is no longer used and can be deleted
Then at the end of the filterManifest method invocation filterManifestWithDeletedFiles to rewrite the Manifest file
private ManifestFile filterManifestWithDeletedFiles( StrictMetricsEvaluator metricsEvaluator, ManifestFile manifest, ManifestReader
reader)
{
boolean isDelete = reader.isDeleteManifestReader();
Evaluator inclusive = inclusiveDeleteEvaluator(reader.spec());
Evaluator strict = strictDeleteEvaluator(reader.spec());
// when this point is reached, there is at least one file that will be deleted in the
// manifest. produce a copy of the manifest with all deleted files removed.
List<F> deletedFiles = Lists.newArrayList();
Set<CharSequenceWrapper> deletedPaths = Sets.newHashSet();
try {
ManifestWriter<F> writer = newManifestWriter(reader.spec());
try {
reader.entries().forEach(entry -> {
F file = entry.file();
boolean fileDelete = deletePaths.contains(file.path()) ||
dropPartitions.contains(file.specId(), file.partition()) ||
(isDelete && entry.sequenceNumber() > 0 && entry.sequenceNumber() < minSequenceNumber);
if(entry.status() ! = ManifestEntry.Status.DELETED) {if (fileDelete || inclusive.eval(file.partition())) {
ValidationException.check(
fileDelete || strict.eval(file.partition()) || metricsEvaluator.eval(file),
"Cannot delete file where some, but not all, rows match filter %s: %s".this.deleteExpression, file.path());
writer.delete(entry);
CharSequenceWrapper wrapper = CharSequenceWrapper.wrap(entry.file().path());
if (deletedPaths.contains(wrapper)) {
LOG.warn("Deleting a duplicate path from manifest {}: {}",
manifest.path(), wrapper.get());
duplicateDeleteCount += 1;
} else {
// only add the file to deletes if it is a new delete
// this keeps the snapshot summary accurate for non-duplicate data
deletedFiles.add(entry.file().copyWithoutStats());
}
deletedPaths.add(wrapper);
} else{ writer.existing(entry); }}}); }finally {
writer.close();
}
// return the filtered manifest as a reader
ManifestFile filtered = writer.toManifestFile();
// update caches
filteredManifests.put(manifest, filtered);
filteredManifestToDeletedFiles.put(filtered, deletedFiles);
return filtered;
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to close manifest writer"); }}Copy the code
ManifestWriter = newManifestWriter(reader.spec()) Here we construct a new ManifestWriter using some information from the ManifestReader to rewrite the properties in the new Avro file
@Override
protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec manifestSpec) {
return MergingSnapshotProducer.this.newManifestWriter(manifestSpec);
}
Copy the code
protected OutputFile newManifestOutput(a) {
return ops.io().newOutputFile(
ops.metadataFileLocation(FileFormat.AVRO.addExtension(commitUUID + "-m" + manifestCount.getAndIncrement())));
}
protected ManifestWriter<DataFile> newManifestWriter(PartitionSpec spec) {
return ManifestFiles.write(ops.current().formatVersion(), spec, newManifestOutput(), snapshotId());
}
Copy the code
The total newManifestOutput() file is created. The avro file is named commitUUID plus “-m” plus a serial number
Writer. delete(entry) Indicates the file attributes to be deleted
void delete(ManifestEntry<F> entry) {
// Use the current Snapshot ID for the delete. It is safe to delete the data file from disk
// when this Snapshot has been removed or when there are no Snapshots older than this one.
addEntry(reused.wrapDelete(snapshotId, entry.file()));
}
Copy the code
Here, the data file to be deleted is added to the current ManifestWriter using the addEntry method. Here, the current snapshotId is used. Finally in the org. Apache. Iceberg. Avro. Call the add method of AvroFileAppender writer. Append write data
Return to the apply() method in the SnapshotProducer manifests manifests in all manifest files. If the newly generated sequenceNumber field is not initialized, it is set to -1. The manifest that has not been rewritten remains the original sequenceNumber ManifestListWriter, which writes the snapshot.avro file
The manifests information is assigned to the manifestFiles and then added to the Writer via writer.addall (Arrays. AsList (manifestFiles))
@Override
public void addAll(Iterable<ManifestFile> values) {
values.forEach(this::add);
}
Copy the code
@Override
public void add(ManifestFile manifest) {
writer.add(prepare(manifest));
}
Copy the code
@Override
protected ManifestFile prepare(ManifestFile manifest) {
return wrapper.wrap(manifest);
}
Copy the code
Manifestfile: manifestFile: manifestFile: manifestFile: manifestFile: manifestFile: manifestFile: manifestFile: manifestFile: manifestFile: manifestFile: manifestFile CommitSnapshotId and sequenceNumber,
private final long commitSnapshotId;
private final long sequenceNumber;
private ManifestFile wrapped = null;
IndexedManifestFile(long commitSnapshotId, long sequenceNumber) {
this.commitSnapshotId = commitSnapshotId;
this.sequenceNumber = sequenceNumber;
}
public ManifestFile wrap(ManifestFile file) {
this.wrapped = file;
return this;
}
Copy the code
The writer in the add(Manifest) method is a FileAppender object, and the implementation class is AvroFileAppender, which is used to write the ManifestFile file
@Override
public void add(D datum) {
try {
numRecords += 1L;
writer.append(datum);
} catch (IOException e) {
throw newRuntimeIOException(e); }}Copy the code
The Append method of AvroFileAppender is called in the Add method and then the Write (T datum, Encoder Out) method in GenericAvroWriter is called
@Override
public void write(T datum, Encoder out) throws IOException {
writer.write(datum, out);
}
Copy the code
ValueWriter is a ValueWriter object. RecordWriter is the implementation class of the StructWriter whose first class is the abstract implementation class. Write method of the StructWriter is called
@Override
public void write(S row, Encoder encoder) throws IOException {
for (int i = 0; i < writers.length; i += 1) { writers[i].write(get(row, i), encoder); }}Copy the code
Then write the manifest object one by one. Get (S struct, int pos) is the RecordWriter method to obtain the value of each position in the record
@Override
protected Object get(IndexedRecord struct, int pos) {
return struct.get(pos);
}
Copy the code
Get calls the get method in IndexedManifestFile, because IndexedManifestFile implements the PUT and get methods in the IndexedRecord interface in org.apache.avro.generic. So it can be used to write avro files, and the following get method is the value of each position
@Override
public Object get(int pos) {
switch (pos) {
case 0:
return wrapped.path();
case 1:
return wrapped.length();
case 2:
return wrapped.partitionSpecId();
case 3:
return wrapped.content().id();
case 4:
if (wrapped.sequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) {
// if the sequence number is being assigned here, then the manifest must be created by the current
// operation. to validate this, check that the snapshot id matches the current commit
Preconditions.checkState(commitSnapshotId == wrapped.snapshotId(),
"Found unassigned sequence number for a manifest from snapshot: %s", wrapped.snapshotId());
return sequenceNumber;
} else {
return wrapped.sequenceNumber();
}
case 5:
if (wrapped.minSequenceNumber() == ManifestWriter.UNASSIGNED_SEQ) {
// same sanity check as above
Preconditions.checkState(commitSnapshotId == wrapped.snapshotId(),
"Found unassigned sequence number for a manifest from snapshot: %s", wrapped.snapshotId());
// if the min sequence number is not determined, then there was no assigned sequence number for any file
// written to the wrapped manifest. replace the unassigned sequence number with the one for this commit
return sequenceNumber;
} else {
return wrapped.minSequenceNumber();
}
case 6:
return wrapped.snapshotId();
case 7:
return wrapped.addedFilesCount();
case 8:
return wrapped.existingFilesCount();
case 9:
return wrapped.deletedFilesCount();
case 10:
return wrapped.addedRowsCount();
case 11:
return wrapped.existingRowsCount();
case 12:
return wrapped.deletedRowsCount();
case 13:
return wrapped.partitions();
default:
throw new UnsupportedOperationException("Unknown field ordinal: "+ pos); }}Copy the code
. Here you can see if wrapped minSequenceNumber () to 1, then gets the current IndexedManifestFile sequenceNumber writing object, minSequenceNumber, too
Call write to encoder
@Override
public void write(Integer i, Encoder encoder) throws IOException {
encoder.writeInt(i);
}
Copy the code
The Snapshot implementation class is BaseSnapShot, which contains the generated Snapshot avro file name, SnapshotID, SequenceNumber, Operator, and the number of files to be added or deleted
BaseSnapshot(FileIO io,
long sequenceNumber,
long snapshotId,
Long parentId,
long timestampMillis,
String operation,
Map<String, String> summary,
String manifestList) {
this.io = io;
this.sequenceNumber = sequenceNumber;
this.snapshotId = snapshotId;
this.parentId = parentId;
this.timestampMillis = timestampMillis;
this.operation = operation;
this.summary = summary;
this.manifestListLocation = manifestList;
}
Copy the code
It then calls the base replaceCurrentSnapshot method create TableMetadata object
public TableMetadata replaceCurrentSnapshot(Snapshot snapshot) {
// there can be operations (viz. rollback, cherrypick) where an existing snapshot could be replacing current
if (snapshotsById.containsKey(snapshot.snapshotId())) {
return setCurrentSnapshotTo(snapshot);
}
ValidationException.check(formatVersion == 1 || snapshot.sequenceNumber() > lastSequenceNumber,
"Cannot add snapshot with sequence number %s older than last sequence number %s",
snapshot.sequenceNumber(), lastSequenceNumber);
List<Snapshot> newSnapshots = ImmutableList.<Snapshot>builder()
.addAll(snapshots)
.add(snapshot)
.build();
List<HistoryEntry> newSnapshotLog = ImmutableList.<HistoryEntry>builder()
.addAll(snapshotLog)
.add(new SnapshotLogEntry(snapshot.timestampMillis(), snapshot.snapshotId()))
.build();
return new TableMetadata(null, formatVersion, uuid, location,
snapshot.sequenceNumber(), snapshot.timestampMillis(), lastColumnId,
currentSchemaId, schemas, defaultSpecId, specs, lastAssignedPartitionId,
defaultSortOrderId, sortOrders, rowKey,
properties, snapshot.snapshotId(), newSnapshots, newSnapshotLog, addPreviousFile(file, lastUpdatedMillis));
}
Copy the code
The commit method of TableOperations is committed, and here is the commit implementation of HadoopTableOperations
public void commit(TableMetadata base, TableMetadata metadata) {
Pair<Integer, TableMetadata> current = versionAndMetadata();
if(base ! = current.second()) {throw new CommitFailedException("Cannot commit changes based on stale table metadata");
}
if (base == metadata) {
LOG.info("Nothing to commit.");
return;
}
Preconditions.checkArgument(base == null || base.location().equals(metadata.location()),
"Hadoop path-based tables cannot be relocated"); Preconditions.checkArgument( ! metadata.properties().containsKey(TableProperties.WRITE_METADATA_LOCATION),"Hadoop path-based tables cannot relocate metadata");
String codecName = metadata.property(
TableProperties.METADATA_COMPRESSION, TableProperties.METADATA_COMPRESSION_DEFAULT);
TableMetadataParser.Codec codec = TableMetadataParser.Codec.fromName(codecName);
String fileExtension = TableMetadataParser.getFileExtension(codec);
Path tempMetadataFile = metadataPath(UUID.randomUUID().toString() + fileExtension);
TableMetadataParser.write(metadata, io().newOutputFile(tempMetadataFile.toString()));
intnextVersion = (current.first() ! =null ? current.first() : 0) + 1;
Path finalMetadataFile = metadataFilePath(nextVersion, codec);
FileSystem fs = getFileSystem(tempMetadataFile, conf);
try {
if (fs.exists(finalMetadataFile)) {
throw new CommitFailedException(
"Version %d already exists: %s", nextVersion, finalMetadataFile); }}catch (IOException e) {
throw new RuntimeIOException(e,
"Failed to check if next version exists: %s", finalMetadataFile);
}
// this rename operation is the atomic commit operation
renameToFinal(fs, tempMetadataFile, finalMetadataFile);
// update the best-effort version pointer
writeVersionHint(nextVersion);
deleteRemovedMetadataFiles(base, metadata);
this.shouldRefresh = true;
}
Copy the code
The method returns first
private synchronized Pair<Integer, TableMetadata> versionAndMetadata(a) {
return Pair.of(version, currentMetadata);
}
Copy the code
CurrentMetadata The TableMetadata object contains the current metadata. json file and other information. Version is the current version number
Create a temporary metadata file, tempMetadataFile, and write to disk by calling write in TableMetadataParser.
public static void write(TableMetadata metadata, OutputFile outputFile) {
internalWrite(metadata, outputFile, false);
}
public static void internalWrite(
TableMetadata metadata, OutputFile outputFile, boolean overwrite) {
boolean isGzip = Codec.fromFileName(outputFile.location()) == Codec.GZIP;
OutputStream stream = overwrite ? outputFile.createOrOverwrite() : outputFile.create();
try (OutputStream ou = isGzip ? new GZIPOutputStream(stream) : stream;
OutputStreamWriter writer = new OutputStreamWriter(ou, StandardCharsets.UTF_8)) {
JsonGenerator generator = JsonUtil.factory().createGenerator(writer);
generator.useDefaultPrettyPrinter();
toJson(metadata, generator);
generator.flush();
} catch (IOException e) {
throw new RuntimeIOException(e, "Failed to write json to file: %s", outputFile); }}Copy the code
The main metadata-related code in toJson then generates new version numbers, official file names, renames, writes to version files, and deletes temporary files