diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java index f8a2e080c329..f88ac54d7e36 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java @@ -116,6 +116,10 @@ public CleanOrphanFilesResult clean() // delete unused files candidateDeletes.removeAll(usedFiles); + + // protect files referenced by the latest snapshot + candidateDeletes.removeAll(collectLatestSnapshotFiles(branches)); + candidateDeletes.stream() .map(candidates::get) .forEach( diff --git a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java index 5b1b045a7d00..9b1c1cb5c953 100644 --- a/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java +++ b/paimon-core/src/main/java/org/apache/paimon/operation/OrphanFilesClean.java @@ -26,6 +26,8 @@ import org.apache.paimon.index.IndexFileHandler; import org.apache.paimon.index.IndexFileMeta; import org.apache.paimon.manifest.IndexManifestEntry; +import org.apache.paimon.manifest.ManifestEntry; +import org.apache.paimon.manifest.ManifestFile; import org.apache.paimon.manifest.ManifestFileMeta; import org.apache.paimon.manifest.ManifestList; import org.apache.paimon.schema.SchemaManager; @@ -234,12 +236,62 @@ protected Set safelyGetAllSnapshots(String branch) throws IOException SnapshotManager snapshotManager = branchTable.snapshotManager(); ChangelogManager changelogManager = branchTable.changelogManager(); TagManager tagManager = branchTable.tagManager(); - Set readSnapshots = new HashSet<>(snapshotManager.safelyGetAllSnapshots()); + + Set readSnapshots = new HashSet<>(); + try { + Snapshot latestSnapshot = snapshotManager.latestSnapshot(); + if (latestSnapshot != null) { + readSnapshots.add(latestSnapshot); + Long earliestId = snapshotManager.earliestSnapshotId(); + if (earliestId != null) { + for (long id = earliestId; id < latestSnapshot.id(); id++) { + try { + readSnapshots.add(snapshotManager.snapshot(id)); + } catch (Exception ignored) { + // snapshot may have been expired by concurrent operations + } + } + } + } + } catch (Exception e) { + LOG.warn("Failed to read latest snapshot, it might have been expired.", e); + } readSnapshots.addAll(tagManager.taggedSnapshots()); readSnapshots.addAll(changelogManager.safelyGetAllChangelogs()); return readSnapshots; } + /** Collect all files referenced by the latest snapshot of each branch. */ + protected Set collectLatestSnapshotFiles(List branches) { + Set files = new HashSet<>(); + for (String branch : branches) { + try { + FileStoreTable branchTable = table.switchToBranch(branch); + Snapshot latest = branchTable.snapshotManager().latestSnapshot(); + if (latest != null) { + Set manifests = new HashSet<>(); + collectWithoutDataFile(branch, latest, files::add, manifests::add); + ManifestFile manifestFile = branchTable.store().manifestFileFactory().create(); + for (String manifest : manifests) { + retryReadingFiles( + () -> manifestFile.readWithIOException(manifest), + Collections.emptyList()) + .stream() + .map(ManifestEntry::file) + .forEach( + f -> { + files.add(f.fileName()); + f.extraFiles().forEach(files::add); + }); + } + } + } catch (Exception e) { + LOG.warn("Failed to re-read latest snapshot for branch {}", branch, e); + } + } + return files; + } + protected void collectWithoutDataFile( String branch, Snapshot snapshot, diff --git a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java index bb20c9904e48..ffb7ad671a26 100644 --- a/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/operation/LocalOrphanFilesCleanTest.java @@ -546,6 +546,55 @@ public void testAbnormallyRemoving() throws Exception { assertThat(orphanFilesClean.clean().getDeletedFilesPath().size()).isGreaterThan(0); } + @Test + public void testCleanAfterSnapshotExpiration() throws Exception { + // write data to create multiple snapshots + List> committedData = new ArrayList<>(); + Map> snapshotData = new HashMap<>(); + SnapshotManager snapshotManager = table.snapshotManager(); + int commitTimes = 10; + writeData(snapshotManager, committedData, snapshotData, new HashMap<>(), commitTimes); + + int snapshotCount = (int) snapshotManager.snapshotCount(); + + // expire some old snapshots + int expired = snapshotCount / 2; + Options expireOptions = new Options(); + expireOptions.set(CoreOptions.SNAPSHOT_EXPIRE_LIMIT, snapshotCount); + expireOptions.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MIN, snapshotCount - expired); + expireOptions.set(CoreOptions.SNAPSHOT_NUM_RETAINED_MAX, snapshotCount - expired); + table.copy(expireOptions.toMap()).newCommit("").expireSnapshots(); + + // add orphan files to snapshot, changelog, manifest, and data directories + int shouldBeDeleted = 0; + int fileNum = RANDOM.nextInt(5) + 1; + + addNonUsedFiles( + new Path(tablePath, "snapshot"), fileNum, Collections.singletonList("UNKNOWN")); + shouldBeDeleted += fileNum; + + addNonUsedFiles( + new Path(tablePath, "changelog"), fileNum, Collections.singletonList("UNKNOWN")); + shouldBeDeleted += fileNum; + + addNonUsedFiles( + manifestDir, + fileNum, + Arrays.asList("manifest-list-", "manifest-", "index-manifest-", "UNKNOWN-")); + shouldBeDeleted += fileNum; + + shouldBeDeleted += randomlyAddNonUsedDataFiles(tablePath); + + assertThat(manuallyAddedFiles.size()).isEqualTo(shouldBeDeleted); + + // run orphan clean and verify only orphan files are deleted + LocalOrphanFilesClean orphanFilesClean = + new LocalOrphanFilesClean( + table, System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(2)); + List deleted = orphanFilesClean.clean().getDeletedFilesPath(); + validate(deleted, snapshotData, new HashMap<>()); + } + @Test public void testRemovingEmptyDirectories() throws Exception { List> committedData = new ArrayList<>(); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java index d3376cb80cf5..878497ab8d20 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/orphan/FlinkOrphanFilesClean.java @@ -243,6 +243,34 @@ public void endInput() throws IOException { usedFiles = usedFiles.union(usedManifestFiles); + // protect files referenced by the latest snapshot + DataStream latestSnapshotFiles = + env.fromCollection(branches) + .name("branch-source-protect") + .process( + new ProcessFunction() { + @Override + public void processElement( + String branch, + ProcessFunction.Context ctx, + Collector out) { + try { + for (String file : + collectLatestSnapshotFiles( + Collections.singletonList(branch))) { + out.collect(file); + } + } catch (Exception e) { + LOG.warn( + "Failed to collect latest snapshot files for branch {}", + branch, + e); + } + } + }) + .name("protect-latest-snapshot"); + usedFiles = usedFiles.union(latestSnapshotFiles); + final OutputTag emptyDirOutputTag = new OutputTag("empty-dir-output") {}; SingleOutputStreamOperator> candidates = env.fromCollection(Collections.singletonList(1), TypeInformation.of(Integer.class)) diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala index 11f1364c1864..2eccfae682e9 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/procedure/SparkOrphanFilesClean.scala @@ -112,10 +112,12 @@ case class SparkOrphanFilesClean( } } - // union manifest and data files + // union manifest and data files, including files from the latest snapshot + val latestSnapshotFiles = collectLatestSnapshotFiles(branches).asScala.toSeq val usedFiles = usedManifestFiles .map(_.manifestName) .union(dataFiles) + .union(spark.createDataset(latestSnapshotFiles)) .toDF("used_name") // find candidate files which can be removed