Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,10 @@ public CleanOrphanFilesResult clean()

// delete unused files
candidateDeletes.removeAll(usedFiles);

// protect files referenced by the latest snapshot
candidateDeletes.removeAll(collectLatestSnapshotFiles(branches));

candidateDeletes.stream()
.map(candidates::get)
.forEach(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.paimon.index.IndexFileHandler;
import org.apache.paimon.index.IndexFileMeta;
import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFile;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.manifest.ManifestList;
import org.apache.paimon.schema.SchemaManager;
Expand Down Expand Up @@ -234,12 +236,62 @@ protected Set<Snapshot> safelyGetAllSnapshots(String branch) throws IOException
SnapshotManager snapshotManager = branchTable.snapshotManager();
ChangelogManager changelogManager = branchTable.changelogManager();
TagManager tagManager = branchTable.tagManager();
Set<Snapshot> readSnapshots = new HashSet<>(snapshotManager.safelyGetAllSnapshots());

Set<Snapshot> readSnapshots = new HashSet<>();
try {
Snapshot latestSnapshot = snapshotManager.latestSnapshot();
if (latestSnapshot != null) {
readSnapshots.add(latestSnapshot);
Long earliestId = snapshotManager.earliestSnapshotId();
if (earliestId != null) {
for (long id = earliestId; id < latestSnapshot.id(); id++) {
try {
readSnapshots.add(snapshotManager.snapshot(id));
} catch (Exception ignored) {
// snapshot may have been expired by concurrent operations
}
}
}
}
} catch (Exception e) {
LOG.warn("Failed to read latest snapshot, it might have been expired.", e);
}
readSnapshots.addAll(tagManager.taggedSnapshots());
readSnapshots.addAll(changelogManager.safelyGetAllChangelogs());
return readSnapshots;
}

/** Collect all files referenced by the latest snapshot of each branch. */
protected Set<String> collectLatestSnapshotFiles(List<String> branches) {
Set<String> files = new HashSet<>();
for (String branch : branches) {
try {
FileStoreTable branchTable = table.switchToBranch(branch);
Snapshot latest = branchTable.snapshotManager().latestSnapshot();
if (latest != null) {
Set<String> manifests = new HashSet<>();
collectWithoutDataFile(branch, latest, files::add, manifests::add);
ManifestFile manifestFile = branchTable.store().manifestFileFactory().create();
for (String manifest : manifests) {
retryReadingFiles(
() -> manifestFile.readWithIOException(manifest),
Collections.<ManifestEntry>emptyList())
.stream()
.map(ManifestEntry::file)
.forEach(
f -> {
files.add(f.fileName());
f.extraFiles().forEach(files::add);
});
}
}
} catch (Exception e) {
LOG.warn("Failed to re-read latest snapshot for branch {}", branch, e);
}
}
return files;
}

protected void collectWithoutDataFile(
String branch,
Snapshot snapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -546,6 +546,55 @@ public void testAbnormallyRemoving() throws Exception {
assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isGreaterThan(0);
}

@Test
public void testCleanAfterSnapshotExpiration() throws Exception {
// write data to create multiple snapshots
List<List<TestPojo>> committedData = new ArrayList<>();
Map<Long, List<TestPojo>> snapshotData = new HashMap<>();
SnapshotManager snapshotManager = table.snapshotManager();
int commitTimes = 10;
writeData(snapshotManager, committedData, snapshotData, new HashMap<>(), commitTimes);

int snapshotCount = (int) snapshotManager.snapshotCount();

// expire some old snapshots
int expired = snapshotCount / 2;
Options expireOptions = new Options();
expireOptions.set(CoreOptions.SNAPSHOT_EXPIRE_LIMIT, snapshotCount);
expireOptions.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, snapshotCount - expired);
expireOptions.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, snapshotCount - expired);
table.copy(expireOptions.toMap()).newCommit("").expireSnapshots();

// add orphan files to snapshot, changelog, manifest, and data directories
int shouldBeDeleted = 0;
int fileNum = RANDOM.nextInt(5) + 1;

addNonUsedFiles(
new Path(tablePath, "snapshot"), fileNum, Collections.singletonList("UNKNOWN"));
shouldBeDeleted += fileNum;

addNonUsedFiles(
new Path(tablePath, "changelog"), fileNum, Collections.singletonList("UNKNOWN"));
shouldBeDeleted += fileNum;

addNonUsedFiles(
manifestDir,
fileNum,
Arrays.asList("manifest-list-", "manifest-", "index-manifest-", "UNKNOWN-"));
shouldBeDeleted += fileNum;

shouldBeDeleted += randomlyAddNonUsedDataFiles(tablePath);

assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted);

// run orphan clean and verify only orphan files are deleted
LocalOrphanFilesClean orphanFilesClean =
new LocalOrphanFilesClean(
table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2));
List<Path> deleted = orphanFilesClean.clean().getDeletedFilesPath();
validate(deleted, snapshotData, new HashMap<>());
}

@Test
public void testRemovingEmptyDirectories() throws Exception {
List<List<TestPojo>> committedData = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,34 @@ public void endInput() throws IOException {

usedFiles = usedFiles.union(usedManifestFiles);

// protect files referenced by the latest snapshot
DataStream<String> latestSnapshotFiles =
env.fromCollection(branches)
.name("branch-source-protect")
.process(
new ProcessFunction<String, String>() {
@Override
public void processElement(
String branch,
ProcessFunction<String, String>.Context ctx,
Collector<String> out) {
try {
for (String file :
collectLatestSnapshotFiles(
Collections.singletonList(branch))) {
out.collect(file);
}
} catch (Exception e) {
LOG.warn(
"Failed to collect latest snapshot files for branch {}",
branch,
e);
}
}
})
.name("protect-latest-snapshot");
usedFiles = usedFiles.union(latestSnapshotFiles);

final OutputTag<Path> emptyDirOutputTag = new OutputTag<Path>("empty-dir-output") {};
SingleOutputStreamOperator<Tuple2<String, Long>> candidates =
env.fromCollection(Collections.singletonList(1), TypeInformation.of(Integer.class))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,12 @@ case class SparkOrphanFilesClean(
}
}

// union manifest and data files
// union manifest and data files, including files from the latest snapshot
val latestSnapshotFiles = collectLatestSnapshotFiles(branches).asScala.toSeq
val usedFiles = usedManifestFiles
.map(_.manifestName)
.union(dataFiles)
.union(spark.createDataset(latestSnapshotFiles))
.toDF("used_name")

// find candidate files which can be removed
Expand Down