diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java index d07c9eb915..e3fd6e3a97 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java @@ -104,42 +104,53 @@ public long commit(IcebergCommittable committable, Map snapshotP // Refresh table to get latest metadata icebergTable.refresh(); - SnapshotUpdate snapshotUpdate; - if (committable.getDeleteFiles().isEmpty()) { - // Simple append-only case: only data files, no delete files or compaction - AppendFiles appendFiles = icebergTable.newAppend(); - committable.getDataFiles().forEach(appendFiles::appendFile); - snapshotUpdate = appendFiles; - } else { - /* - Row delta validations are not needed for streaming changes that write equality - deletes. Equality deletes are applied to data in all previous sequence numbers, - so retries may push deletes further in the future, but do not affect correctness. - Position deletes committed to the table in this path are used only to delete rows - from data files that are being added in this commit. There is no way for data - files added along with the delete files to be concurrently removed, so there is - no need to validate the files referenced by the position delete files that are - being committed. - */ - RowDelta rowDelta = icebergTable.newRowDelta(); - committable.getDataFiles().forEach(rowDelta::addRows); - committable.getDeleteFiles().forEach(rowDelta::addDeletes); - snapshotUpdate = rowDelta; - } + Long snapshotId = null; + + /* + Rewrite files MUST be committed BEFORE data/delete files. - // commit written files - long snapshotId = commit(snapshotUpdate, snapshotProperties); + Rewrite operations call validateFromSnapshot(snapshotId) to ensure the files being + replaced haven't changed since the rewrite was planned. If we commit data/delete + files first (via AppendFiles or RowDelta), the table's current snapshot advances. + The subsequent rewrite validation then fails because it's checking against a + now-stale snapshot ID, triggering Iceberg's retry loop indefinitely. - // There exists rewrite files, commit rewrite files + By committing rewrites first, the validation succeeds against the current snapshot, + and subsequent data/delete commits simply advance the snapshot further. + */ List rewriteDataFileResults = committable.rewriteDataFileResults(); if (!rewriteDataFileResults.isEmpty()) { - Long rewriteCommitSnapshotId = - commitRewrite(rewriteDataFileResults, snapshotProperties); - if (rewriteCommitSnapshotId != null) { - snapshotId = rewriteCommitSnapshotId; + snapshotId = commitRewrite(rewriteDataFileResults, snapshotProperties); + } + + // Commit data/delete files + if (!committable.getDataFiles().isEmpty() || !committable.getDeleteFiles().isEmpty()) { + SnapshotUpdate snapshotUpdate; + if (committable.getDeleteFiles().isEmpty()) { + // Simple append-only case: only data files, no delete files + AppendFiles appendFiles = icebergTable.newAppend(); + committable.getDataFiles().forEach(appendFiles::appendFile); + snapshotUpdate = appendFiles; + } else { + /* + Row delta validations are not needed for streaming changes that write equality + deletes. Equality deletes are applied to data in all previous sequence numbers, + so retries may push deletes further in the future, but do not affect correctness. + Position deletes committed to the table in this path are used only to delete rows + from data files that are being added in this commit. There is no way for data + files added along with the delete files to be concurrently removed, so there is + no need to validate the files referenced by the position delete files that are + being committed. + */ + RowDelta rowDelta = icebergTable.newRowDelta(); + committable.getDataFiles().forEach(rowDelta::addRows); + committable.getDeleteFiles().forEach(rowDelta::addDeletes); + snapshotUpdate = rowDelta; } + snapshotId = commit(snapshotUpdate, snapshotProperties); } + return checkNotNull(snapshotId, "Iceberg committed snapshot id must be non-null."); } catch (Exception e) { throw new IOException("Failed to commit to Iceberg table.", e); diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java index 4b6d9de71a..022717bc3f 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java @@ -126,7 +126,7 @@ private void checkRecords(List actualRows, List expectedRow } @Test - void testPkTableCompactionWithConflict() throws Exception { + void testPkTableCompactionWithDeletedFiles() throws Exception { JobClient jobClient = buildTieringJob(execEnv); try { TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_2"); @@ -147,17 +147,17 @@ void testPkTableCompactionWithConflict() throws Exception { // add pos-delete and trigger compaction rows = Arrays.asList(row(4, "v1"), row(4, "v2")); flussRows.add(writeIcebergTableRecords(t1, t1Bucket, 6, false, rows).get(1)); - // rewritten files should fail to commit due to conflict, add check here checkRecords(getIcebergRecords(t1), flussRows); - // 4 data file and 1 delete file - checkFileStatusInIcebergTable(t1, 4, true); + checkFileStatusInIcebergTable(t1, 2, true); - // previous compaction conflicts won't prevent further compaction, and check iceberg - // records + // previous compaction conflicts won't prevent further compaction, and + // check iceberg records rows = Collections.singletonList(row(5, "v1")); flussRows.addAll(writeIcebergTableRecords(t1, t1Bucket, 7, false, rows)); checkRecords(getIcebergRecords(t1), flussRows); - checkFileStatusInIcebergTable(t1, 2, false); + // check that the correct number of files (2 from compaction + 1 new) + // exist and that a deletion file is present + checkFileStatusInIcebergTable(t1, 3, true); } finally { jobClient.cancel().get(); } diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java index 99fd340cf4..45e3f5c9ee 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java @@ -358,17 +358,19 @@ protected void checkFileStatusInIcebergTable( throws IOException { org.apache.iceberg.Table table = icebergCatalog.loadTable(toIceberg(tablePath)); int count = 0; + boolean deleteFileExists = false; try (CloseableIterable tasks = table.newScan().planFiles()) { for (FileScanTask ignored : tasks) { - if (shouldDeleteFileExist) { - assertThat(ignored.deletes()).isNotEmpty(); - } else { - assertThat(ignored.deletes()).isEmpty(); + // Not all data files may have associated delete files, so track if + // any exist when the shouldDeleteFileExist flag is set + if (shouldDeleteFileExist && !deleteFileExists) { + deleteFileExists = !ignored.deletes().isEmpty(); } count++; } } assertThat(count).isEqualTo(expectedFileCount); + assertThat(deleteFileExists).isEqualTo(shouldDeleteFileExist); } protected void checkDataInIcebergAppendOnlyPartitionedTable( diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java new file mode 100644 index 0000000000..9811a63b93 --- /dev/null +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java @@ -0,0 +1,388 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.iceberg.tiering; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult; +import org.apache.fluss.lake.iceberg.tiering.writer.TaskWriterFactory; +import org.apache.fluss.metadata.TablePath; + +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DataFileSet; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg; +import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; +import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; +import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; +import static org.apache.fluss.utils.Preconditions.checkState; +import static org.assertj.core.api.Assertions.assertThat; + +/** UT for {@link IcebergLakeCommitter} operations. */ +class IcebergLakeCommitterTest { + + private @TempDir File tempWarehouseDir; + private Catalog icebergCatalog; + private IcebergCatalogProvider catalogProvider; + + @BeforeEach + void setUp() { + Configuration configuration = new Configuration(); + configuration.setString("warehouse", "file://" + tempWarehouseDir); + configuration.setString("type", "hadoop"); + configuration.setString("name", "test"); + catalogProvider = new IcebergCatalogProvider(configuration); + icebergCatalog = catalogProvider.get(); + } + + @Test + void testRewriteWithDataFilesSucceeds() throws Exception { + // Create table with minimal retry configuration to fail fast instead of blocking + Map tableProps = new HashMap<>(); + tableProps.put("commit.retry.num-retries", "2"); + tableProps.put("commit.retry.min-wait-ms", "10"); + tableProps.put("commit.retry.max-wait-ms", "100"); + + TablePath tablePath = TablePath.of("iceberg", "data_rewrite_conflict_table"); + createTable(tablePath, tableProps); + Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath)); + int bucket = 0; + + // Write 3 initial data files + appendDataFiles(icebergTable, 3, 10, 0, bucket); + icebergTable.refresh(); + + int initialFileCount = countDataFiles(icebergTable); + assertThat(initialFileCount).isEqualTo(3); + + long rewriteSnapshotId = icebergTable.currentSnapshot().snapshotId(); + + // Create the rewrite result: compacting the 3 files into 1 + DataFileSet filesToDelete = DataFileSet.create(); + icebergTable.newScan().planFiles().forEach(task -> filesToDelete.add(task.file())); + assertThat(filesToDelete).hasSize(3); + + DataFileSet filesToAdd = DataFileSet.create(); + filesToAdd.add(writeDataFile(icebergTable, 30, 0, bucket)); + + RewriteDataFileResult rewriteResult = + new RewriteDataFileResult(rewriteSnapshotId, filesToDelete, filesToAdd); + + // Create new data file (NO delete files - simple append path) + DataFile newDataFile = writeDataFile(icebergTable, 5, 100, bucket); + + // Build committable with ONLY data file + rewrite (no delete files) + IcebergCommittable committable = + IcebergCommittable.builder() + .addDataFile(newDataFile) + .addRewriteDataFileResult(rewriteResult) + .build(); + + // Perform the commit operation + IcebergLakeCommitter committer = new IcebergLakeCommitter(catalogProvider, tablePath); + committer.commit(committable, Collections.emptyMap()); + committer.close(); + + // Verify that commit and rewrite worked as expected + icebergTable.refresh(); + int finalFileCount = countDataFiles(icebergTable); + assertThat(finalFileCount) + .as("With data files only, rewrite succeeds - got %d files", finalFileCount) + .isEqualTo(2); + } + + @Test + void testRewriteOnlyCommitSucceeds() throws Exception { + TablePath tablePath = TablePath.of("iceberg", "rewrite_only_table"); + createTable(tablePath); + Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath)); + int bucket = 0; + + // Seed with files to rewrite + appendDataFiles(icebergTable, 3, 10, 0, bucket); + icebergTable.refresh(); + + long snapshotId = icebergTable.currentSnapshot().snapshotId(); + + // Collect files to delete + DataFileSet filesToDelete = DataFileSet.create(); + icebergTable.newScan().planFiles().forEach(task -> filesToDelete.add(task.file())); + + // Create compacted replacement file + DataFileSet filesToAdd = DataFileSet.create(); + filesToAdd.add(writeDataFile(icebergTable, 30, 0, bucket)); + + RewriteDataFileResult rewriteResult = + new RewriteDataFileResult(snapshotId, filesToDelete, filesToAdd); + + // Committable with ONLY rewrite results, no new data files + IcebergCommittable committable = + IcebergCommittable.builder().addRewriteDataFileResult(rewriteResult).build(); + + IcebergLakeCommitter committer = new IcebergLakeCommitter(catalogProvider, tablePath); + + // This should complete successfully and create a new snapshot + long resultSnapshotId = committer.commit(committable, Collections.emptyMap()); + assertThat(resultSnapshotId).isNotEqualTo(snapshotId); + + // Verify the rewrite completed (should have 1 file instead of 3) + icebergTable.refresh(); + int finalFileCount = countDataFiles(icebergTable); + assertThat(finalFileCount).isEqualTo(1); + + committer.close(); + } + + @Test + void testCommitSucceeds() throws Exception { + TablePath tablePath = TablePath.of("iceberg", "data_only_table"); + createTable(tablePath); + Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath)); + int bucket = 0; + + DataFile newDataFile = writeDataFile(icebergTable, 10, 0, bucket); + + IcebergCommittable committable = + IcebergCommittable.builder().addDataFile(newDataFile).build(); + + IcebergLakeCommitter committer = new IcebergLakeCommitter(catalogProvider, tablePath); + + long snapshotId = committer.commit(committable, Collections.emptyMap()); + assertThat(snapshotId).isGreaterThan(0); + + committer.close(); + } + + @Test + void testCommitWithDeleteFilesSucceeds() throws Exception { + TablePath tablePath = TablePath.of("iceberg", "data_delete_table"); + createTable(tablePath); + Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath)); + int bucket = 0; + + // First, create some initial data + appendDataFiles(icebergTable, 1, 10, 0, bucket); + icebergTable.refresh(); + + DataFile existingFile = icebergTable.newScan().planFiles().iterator().next().file(); + + // Create new data file and a delete file (simulates PK table update without compaction) + DataFile newDataFile = writeDataFile(icebergTable, 5, 100, bucket); + DeleteFile deleteFile = + createDeleteFile(icebergTable, existingFile.partition(), existingFile.location()); + + IcebergCommittable committable = + IcebergCommittable.builder() + .addDataFile(newDataFile) + .addDeleteFile(deleteFile) + .build(); + + IcebergLakeCommitter committer = new IcebergLakeCommitter(catalogProvider, tablePath); + + long snapshotId = committer.commit(committable, Collections.emptyMap()); + assertThat(snapshotId).isGreaterThan(0); + + // Verify that we should have 2 data files (1 original + 1 new) + icebergTable.refresh(); + int finalFileCount = countDataFiles(icebergTable); + assertThat(finalFileCount).isEqualTo(2); + + committer.close(); + } + + @Test + void testRewriteWithDeleteFilesInSameCycleSucceeds() throws Exception { + // Create table and seed with initial data files + TablePath tablePath = TablePath.of("iceberg", "conflict_test_table"); + createTable(tablePath); + Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath)); + int bucket = 0; + + // Write 3 initial data files that will be the target of the rewrite + appendDataFiles(icebergTable, 3, 10, 0, bucket); + icebergTable.refresh(); + + int initialFileCount = countDataFiles(icebergTable); + assertThat(initialFileCount).isEqualTo(3); + + // Capture the current snapshot ID - this is what the rewrite will validate against + long rewriteSnapshotId = icebergTable.currentSnapshot().snapshotId(); + + // Collect the data files to be rewritten and get one to reference in delete file + DataFileSet filesToDelete = DataFileSet.create(); + DataFile firstDataFile = null; + for (FileScanTask task : icebergTable.newScan().planFiles()) { + filesToDelete.add(task.file()); + if (firstDataFile == null) { + firstDataFile = task.file(); + } + } + assertThat(filesToDelete).hasSize(3); + assertThat(firstDataFile).isNotNull(); + + // Create the rewrite result: compacting the 3 files into 1 + DataFileSet filesToAdd = DataFileSet.create(); + filesToAdd.add(writeDataFile(icebergTable, 30, 0, bucket)); + + RewriteDataFileResult rewriteResult = + new RewriteDataFileResult(rewriteSnapshotId, filesToDelete, filesToAdd); + + // Create a delete file that references one of the data files being rewritten + // IMPORTANT: when RowDelta commits this delete file, it will conflict with the rewrite's + // validation + // since the file now has pending deletes + DeleteFile deleteFile = + createDeleteFile(icebergTable, firstDataFile.partition(), firstDataFile.location()); + + // Create new data file to include in the commit + DataFile newDataFile = writeDataFile(icebergTable, 5, 100, bucket); + + // Build a committable with data file, delete file, AND rewrite results + // The presence of delete file triggers RowDelta path which commits first + IcebergCommittable committable = + IcebergCommittable.builder() + .addDataFile(newDataFile) + .addDeleteFile(deleteFile) + .addRewriteDataFileResult(rewriteResult) + .build(); + + // Perform the commit operation + IcebergLakeCommitter committer = new IcebergLakeCommitter(catalogProvider, tablePath); + committer.commit(committable, Collections.emptyMap()); + committer.close(); + + // Verify that rewrite succeeded + icebergTable.refresh(); + int finalFileCount = countDataFiles(icebergTable); + assertThat(finalFileCount) + .as("Rewrite should succeed with delete files present") + .isEqualTo(2); + } + + // Helper methods + + private void createTable(TablePath tablePath) { + createTable(tablePath, Collections.emptyMap()); + } + + private void createTable(TablePath tablePath, Map properties) { + Namespace namespace = Namespace.of(tablePath.getDatabaseName()); + SupportsNamespaces ns = (SupportsNamespaces) icebergCatalog; + if (!ns.namespaceExists(namespace)) { + ns.createNamespace(namespace); + } + + Schema schema = + new Schema( + Types.NestedField.optional(1, "c1", Types.IntegerType.get()), + Types.NestedField.optional(2, "c2", Types.StringType.get()), + Types.NestedField.required(3, BUCKET_COLUMN_NAME, Types.IntegerType.get()), + Types.NestedField.required(4, OFFSET_COLUMN_NAME, Types.LongType.get()), + Types.NestedField.required( + 5, TIMESTAMP_COLUMN_NAME, Types.TimestampType.withZone())); + + PartitionSpec partitionSpec = + PartitionSpec.builderFor(schema).identity(BUCKET_COLUMN_NAME).build(); + TableIdentifier tableId = + TableIdentifier.of(tablePath.getDatabaseName(), tablePath.getTableName()); + icebergCatalog.createTable(tableId, schema, partitionSpec, properties); + } + + private void appendDataFiles( + Table table, int numFiles, int rowsPerFile, int baseOffset, int bucket) + throws Exception { + AppendFiles append = table.newAppend(); + for (int i = 0; i < numFiles; i++) { + append.appendFile( + writeDataFile(table, rowsPerFile, baseOffset + (i * rowsPerFile), bucket)); + } + append.commit(); + } + + private DataFile writeDataFile(Table table, int rows, int startOffset, int bucket) + throws Exception { + try (TaskWriter taskWriter = + TaskWriterFactory.createTaskWriter(table, null, bucket)) { + for (int i = 0; i < rows; i++) { + Record r = GenericRecord.create(table.schema()); + r.setField("c1", i); + r.setField("c2", "value_" + i); + r.setField(BUCKET_COLUMN_NAME, bucket); + r.setField(OFFSET_COLUMN_NAME, (long) (startOffset + i)); + r.setField(TIMESTAMP_COLUMN_NAME, OffsetDateTime.now(ZoneOffset.UTC)); + taskWriter.write(r); + } + DataFile[] dataFiles = taskWriter.dataFiles(); + checkState(dataFiles.length == 1); + return dataFiles[0]; + } + } + + private int countDataFiles(Table table) throws IOException { + int count = 0; + try (CloseableIterable tasks = table.newScan().planFiles()) { + for (FileScanTask ignored : tasks) { + count++; + } + } + return count; + } + + private DeleteFile createDeleteFile( + Table table, StructLike partition, CharSequence dataFilePath) { + // Create a position delete file that references the given data file + // This simulates a delete operation on the same file being rewritten + return FileMetadata.deleteFileBuilder(table.spec()) + .withPath(dataFilePath.toString() + ".deletes.parquet") + .withFileSizeInBytes(1024) + .withPartition(partition) + .withRecordCount(1) + .withFormat(FileFormat.PARQUET) + .ofPositionDeletes() + .build(); + } +}