From e9ad4e4fc158dea1fc8409f058eb4da113cfe78a Mon Sep 17 00:00:00 2001 From: Rion Williams Date: Tue, 20 Jan 2026 14:50:50 -0600 Subject: [PATCH 1/6] [lake/iceberg] Added IcebergLakeCommitter test suite (including conflict reproducibility test) --- .../tiering/IcebergLakeCommitterTest.java | 409 ++++++++++++++++++ 1 file changed, 409 insertions(+) create mode 100644 fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java new file mode 100644 index 0000000000..8feb8d75a3 --- /dev/null +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java @@ -0,0 +1,409 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.fluss.lake.iceberg.tiering; + +import org.apache.fluss.config.Configuration; +import org.apache.fluss.lake.iceberg.maintenance.RewriteDataFileResult; +import org.apache.fluss.lake.iceberg.tiering.writer.TaskWriterFactory; +import org.apache.fluss.metadata.TablePath; + +import org.apache.iceberg.AppendFiles; +import org.apache.iceberg.DataFile; +import org.apache.iceberg.DeleteFile; +import org.apache.iceberg.FileFormat; +import org.apache.iceberg.FileMetadata; +import org.apache.iceberg.FileScanTask; +import org.apache.iceberg.PartitionSpec; +import org.apache.iceberg.Schema; +import org.apache.iceberg.StructLike; +import org.apache.iceberg.Table; +import org.apache.iceberg.catalog.Catalog; +import org.apache.iceberg.catalog.Namespace; +import org.apache.iceberg.catalog.SupportsNamespaces; +import org.apache.iceberg.catalog.TableIdentifier; +import org.apache.iceberg.data.GenericRecord; +import org.apache.iceberg.data.Record; +import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.io.TaskWriter; +import org.apache.iceberg.types.Types; +import org.apache.iceberg.util.DataFileSet; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; + +import java.io.File; +import java.io.IOException; +import java.time.OffsetDateTime; +import java.time.ZoneOffset; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static org.apache.fluss.lake.iceberg.utils.IcebergConversions.toIceberg; +import static org.apache.fluss.metadata.TableDescriptor.BUCKET_COLUMN_NAME; +import static org.apache.fluss.metadata.TableDescriptor.OFFSET_COLUMN_NAME; +import static org.apache.fluss.metadata.TableDescriptor.TIMESTAMP_COLUMN_NAME; +import static org.apache.fluss.utils.Preconditions.checkState; +import static org.assertj.core.api.Assertions.assertThat; + +/** UT for {@link IcebergLakeCommitter} operations. */ +class IcebergLakeCommitterTest { + + private @TempDir File tempWarehouseDir; + private Catalog icebergCatalog; + private IcebergCatalogProvider catalogProvider; + + @BeforeEach + void setUp() { + Configuration configuration = new Configuration(); + configuration.setString("warehouse", "file://" + tempWarehouseDir); + configuration.setString("type", "hadoop"); + configuration.setString("name", "test"); + catalogProvider = new IcebergCatalogProvider(configuration); + icebergCatalog = catalogProvider.get(); + } + + /** + * Reproduces the rewrite conflict bug documented in GitHub issue #2420. + * + *

This test reproduces the scenario from IcebergRewriteITCase where PK table writes with + * auto-compaction enabled cause blocking. The flow is: + * + *

    + *
  1. IcebergLakeWriter writes data to a PK table, generating delete files for updates + *
  2. Auto-compaction runs concurrently, producing RewriteDataFileResult + *
  3. complete() combines both: WriteResult (data + delete files) + RewriteDataFileResult + *
  4. IcebergLakeCommitter.commit() receives both in the same cycle + *
+ * + *

Bug behavior: RowDelta commits first (because delete files are present), advancing the + * snapshot from N to N+1. The rewrite then calls validateFromSnapshot(N), but N is now stale. + * Iceberg's retry loop detects the mismatch and retries, but the conflict is permanent because + * the validation snapshot is fixed. With default Iceberg settings (commit.retry.total-timeout-ms + * = 30 minutes), this causes blocking for 15+ minutes before failing. + * + */ + @Test + void testRewriteFailsWhenDeleteFilesAddedInSameCycle() throws Exception { + // Create table and seed with initial data files + TablePath tablePath = TablePath.of("iceberg", "conflict_test_table"); + createTable(tablePath); + Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath)); + int bucket = 0; + + // Write 3 initial data files that will be the target of the rewrite + appendDataFiles(icebergTable, 3, 10, 0, bucket); + icebergTable.refresh(); + + int initialFileCount = countDataFiles(icebergTable); + assertThat(initialFileCount).isEqualTo(3); + + // Capture the current snapshot ID - this is what the rewrite will validate against + long rewriteSnapshotId = icebergTable.currentSnapshot().snapshotId(); + + // Collect the data files to be rewritten and get one to reference in delete file + DataFileSet filesToDelete = DataFileSet.create(); + DataFile firstDataFile = null; + for (FileScanTask task : icebergTable.newScan().planFiles()) { + filesToDelete.add(task.file()); + if (firstDataFile == null) { + firstDataFile = task.file(); + } + } + assertThat(filesToDelete).hasSize(3); + assertThat(firstDataFile).isNotNull(); + + // Create the rewrite result: compacting the 3 files into 1 + DataFileSet filesToAdd = DataFileSet.create(); + filesToAdd.add(writeDataFile(icebergTable, 30, 0, bucket)); + + RewriteDataFileResult rewriteResult = + new RewriteDataFileResult(rewriteSnapshotId, filesToDelete, filesToAdd); + + // Create a delete file that references one of the data files being rewritten + // IMPORTANT: when RowDelta commits this delete file, it will conflict with the rewrite's validation + // since the file now has pending deletes + DeleteFile deleteFile = + createDeleteFile(icebergTable, firstDataFile.partition(), firstDataFile.location()); + + // Create new data file to include in the commit + DataFile newDataFile = writeDataFile(icebergTable, 5, 100, bucket); + + // Build a committable with data file, delete file, AND rewrite results + // The presence of delete file triggers RowDelta path which commits first + IcebergCommittable committable = + IcebergCommittable.builder() + .addDataFile(newDataFile) + .addDeleteFile(deleteFile) + .addRewriteDataFileResult(rewriteResult) + .build(); + + // Perform the commit operation (to trigger the bug) + IcebergLakeCommitter committer = new IcebergLakeCommitter(catalogProvider, tablePath); + committer.commit(committable, Collections.emptyMap()); + committer.close(); + + // Verify that the rewrite will _silently_ resulting in the incorrect file counts + icebergTable.refresh(); + int finalFileCount = countDataFiles(icebergTable); + assertThat(finalFileCount) + .as( + "Rewrite failed silently. Expecting 4 files " + + "(3 original + 1 new) to demonstrate bug, should otherwise be 2 (due to compaction).") + .isEqualTo(4); + } + + @Test + void testRewriteWithDataFilesSucceeds() throws Exception { + // Create table with minimal retry configuration to fail fast instead of blocking + Map tableProps = new HashMap<>(); + tableProps.put("commit.retry.num-retries", "2"); + tableProps.put("commit.retry.min-wait-ms", "10"); + tableProps.put("commit.retry.max-wait-ms", "100"); + + TablePath tablePath = TablePath.of("iceberg", "data_rewrite_conflict_table"); + createTable(tablePath, tableProps); + Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath)); + int bucket = 0; + + // Write 3 initial data files + appendDataFiles(icebergTable, 3, 10, 0, bucket); + icebergTable.refresh(); + + int initialFileCount = countDataFiles(icebergTable); + assertThat(initialFileCount).isEqualTo(3); + + long rewriteSnapshotId = icebergTable.currentSnapshot().snapshotId(); + + // Create the rewrite result: compacting the 3 files into 1 + DataFileSet filesToDelete = DataFileSet.create(); + icebergTable.newScan().planFiles().forEach(task -> filesToDelete.add(task.file())); + assertThat(filesToDelete).hasSize(3); + + DataFileSet filesToAdd = DataFileSet.create(); + filesToAdd.add(writeDataFile(icebergTable, 30, 0, bucket)); + + RewriteDataFileResult rewriteResult = + new RewriteDataFileResult(rewriteSnapshotId, filesToDelete, filesToAdd); + + // Create new data file (NO delete files - simple append path) + DataFile newDataFile = writeDataFile(icebergTable, 5, 100, bucket); + + // Build committable with ONLY data file + rewrite (no delete files) + IcebergCommittable committable = + IcebergCommittable.builder() + .addDataFile(newDataFile) + .addRewriteDataFileResult(rewriteResult) + .build(); + + // Perform the commit operation + IcebergLakeCommitter committer = new IcebergLakeCommitter(catalogProvider, tablePath); + committer.commit(committable, Collections.emptyMap()); + committer.close(); + + // Verify that commit and rewrite worked as expected + icebergTable.refresh(); + int finalFileCount = countDataFiles(icebergTable); + assertThat(finalFileCount) + .as("With data files only, rewrite succeeds - got %d files", finalFileCount) + .isEqualTo(2); + } + + @Test + void testRewriteOnlyCommitSucceeds() throws Exception { + TablePath tablePath = TablePath.of("iceberg", "rewrite_only_table"); + createTable(tablePath); + Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath)); + int bucket = 0; + + // Seed with files to rewrite + appendDataFiles(icebergTable, 3, 10, 0, bucket); + icebergTable.refresh(); + + long snapshotId = icebergTable.currentSnapshot().snapshotId(); + + // Collect files to delete + DataFileSet filesToDelete = DataFileSet.create(); + icebergTable.newScan().planFiles().forEach(task -> filesToDelete.add(task.file())); + + // Create compacted replacement file + DataFileSet filesToAdd = DataFileSet.create(); + filesToAdd.add(writeDataFile(icebergTable, 30, 0, bucket)); + + RewriteDataFileResult rewriteResult = + new RewriteDataFileResult(snapshotId, filesToDelete, filesToAdd); + + // Committable with ONLY rewrite results, no new data files + IcebergCommittable committable = + IcebergCommittable.builder().addRewriteDataFileResult(rewriteResult).build(); + + IcebergLakeCommitter committer = new IcebergLakeCommitter(catalogProvider, tablePath); + + // This should complete successfully and create a new snapshot + long resultSnapshotId = committer.commit(committable, Collections.emptyMap()); + assertThat(resultSnapshotId).isNotEqualTo(snapshotId); + + // Verify the rewrite completed (should have 1 file instead of 3) + icebergTable.refresh(); + int finalFileCount = countDataFiles(icebergTable); + assertThat(finalFileCount).isEqualTo(1); + + committer.close(); + } + + @Test + void testCommitSucceeds() throws Exception { + TablePath tablePath = TablePath.of("iceberg", "data_only_table"); + createTable(tablePath); + Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath)); + int bucket = 0; + + DataFile newDataFile = writeDataFile(icebergTable, 10, 0, bucket); + + IcebergCommittable committable = + IcebergCommittable.builder().addDataFile(newDataFile).build(); + + IcebergLakeCommitter committer = new IcebergLakeCommitter(catalogProvider, tablePath); + + long snapshotId = committer.commit(committable, Collections.emptyMap()); + assertThat(snapshotId).isGreaterThan(0); + + committer.close(); + } + + /** Tests that committing data files with delete files (no rewrite) succeeds. */ + @Test + void testCommitWithDeleteFilesSucceeds() throws Exception { + TablePath tablePath = TablePath.of("iceberg", "data_delete_table"); + createTable(tablePath); + Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath)); + int bucket = 0; + + // First, create some initial data + appendDataFiles(icebergTable, 1, 10, 0, bucket); + icebergTable.refresh(); + + DataFile existingFile = icebergTable.newScan().planFiles().iterator().next().file(); + + // Create new data file and a delete file (simulates PK table update without compaction) + DataFile newDataFile = writeDataFile(icebergTable, 5, 100, bucket); + DeleteFile deleteFile = + createDeleteFile(icebergTable, existingFile.partition(), existingFile.location()); + + IcebergCommittable committable = + IcebergCommittable.builder() + .addDataFile(newDataFile) + .addDeleteFile(deleteFile) + .build(); + + IcebergLakeCommitter committer = new IcebergLakeCommitter(catalogProvider, tablePath); + + long snapshotId = committer.commit(committable, Collections.emptyMap()); + assertThat(snapshotId).isGreaterThan(0); + + // Verify that we should have 2 data files (1 original + 1 new) + icebergTable.refresh(); + int finalFileCount = countDataFiles(icebergTable); + assertThat(finalFileCount).isEqualTo(2); + + committer.close(); + } + + // Helper methods + + private void createTable(TablePath tablePath) { + createTable(tablePath, Collections.emptyMap()); + } + + private void createTable(TablePath tablePath, Map properties) { + Namespace namespace = Namespace.of(tablePath.getDatabaseName()); + SupportsNamespaces ns = (SupportsNamespaces) icebergCatalog; + if (!ns.namespaceExists(namespace)) { + ns.createNamespace(namespace); + } + + Schema schema = + new Schema( + Types.NestedField.optional(1, "c1", Types.IntegerType.get()), + Types.NestedField.optional(2, "c2", Types.StringType.get()), + Types.NestedField.required(3, BUCKET_COLUMN_NAME, Types.IntegerType.get()), + Types.NestedField.required(4, OFFSET_COLUMN_NAME, Types.LongType.get()), + Types.NestedField.required( + 5, TIMESTAMP_COLUMN_NAME, Types.TimestampType.withZone())); + + PartitionSpec partitionSpec = + PartitionSpec.builderFor(schema).identity(BUCKET_COLUMN_NAME).build(); + TableIdentifier tableId = + TableIdentifier.of(tablePath.getDatabaseName(), tablePath.getTableName()); + icebergCatalog.createTable(tableId, schema, partitionSpec, properties); + } + + private void appendDataFiles( + Table table, int numFiles, int rowsPerFile, int baseOffset, int bucket) + throws Exception { + AppendFiles append = table.newAppend(); + for (int i = 0; i < numFiles; i++) { + append.appendFile( + writeDataFile(table, rowsPerFile, baseOffset + (i * rowsPerFile), bucket)); + } + append.commit(); + } + + private DataFile writeDataFile(Table table, int rows, int startOffset, int bucket) + throws Exception { + try (TaskWriter taskWriter = + TaskWriterFactory.createTaskWriter(table, null, bucket)) { + for (int i = 0; i < rows; i++) { + Record r = GenericRecord.create(table.schema()); + r.setField("c1", i); + r.setField("c2", "value_" + i); + r.setField(BUCKET_COLUMN_NAME, bucket); + r.setField(OFFSET_COLUMN_NAME, (long) (startOffset + i)); + r.setField(TIMESTAMP_COLUMN_NAME, OffsetDateTime.now(ZoneOffset.UTC)); + taskWriter.write(r); + } + DataFile[] dataFiles = taskWriter.dataFiles(); + checkState(dataFiles.length == 1); + return dataFiles[0]; + } + } + + private int countDataFiles(Table table) throws IOException { + int count = 0; + try (CloseableIterable tasks = table.newScan().planFiles()) { + for (FileScanTask ignored : tasks) { + count++; + } + } + return count; + } + + private DeleteFile createDeleteFile(Table table, StructLike partition, CharSequence dataFilePath) { + // Create a position delete file that references the given data file + // This simulates a delete operation on the same file being rewritten + return FileMetadata.deleteFileBuilder(table.spec()) + .withPath(dataFilePath.toString() + ".deletes.parquet") + .withFileSizeInBytes(1024) + .withPartition(partition) + .withRecordCount(1) + .withFormat(FileFormat.PARQUET) + .ofPositionDeletes() + .build(); + } +} From c44d13d3b66b1b8f947d724a19291f6b071a3a37 Mon Sep 17 00:00:00 2001 From: Rion Williams Date: Tue, 20 Jan 2026 15:51:13 -0600 Subject: [PATCH 2/6] [lake/iceberg] spotless:apply --- .../iceberg/tiering/IcebergLakeCommitterTest.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java index 8feb8d75a3..6ea47524ab 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java @@ -95,9 +95,9 @@ void setUp() { *

Bug behavior: RowDelta commits first (because delete files are present), advancing the * snapshot from N to N+1. The rewrite then calls validateFromSnapshot(N), but N is now stale. * Iceberg's retry loop detects the mismatch and retries, but the conflict is permanent because - * the validation snapshot is fixed. With default Iceberg settings (commit.retry.total-timeout-ms - * = 30 minutes), this causes blocking for 15+ minutes before failing. - * + * the validation snapshot is fixed. With default Iceberg settings + * (commit.retry.total-timeout-ms = 30 minutes), this causes blocking for 15+ minutes before + * failing. */ @Test void testRewriteFailsWhenDeleteFilesAddedInSameCycle() throws Exception { @@ -137,7 +137,8 @@ void testRewriteFailsWhenDeleteFilesAddedInSameCycle() throws Exception { new RewriteDataFileResult(rewriteSnapshotId, filesToDelete, filesToAdd); // Create a delete file that references one of the data files being rewritten - // IMPORTANT: when RowDelta commits this delete file, it will conflict with the rewrite's validation + // IMPORTANT: when RowDelta commits this delete file, it will conflict with the rewrite's + // validation // since the file now has pending deletes DeleteFile deleteFile = createDeleteFile(icebergTable, firstDataFile.partition(), firstDataFile.location()); @@ -394,7 +395,8 @@ private int countDataFiles(Table table) throws IOException { return count; } - private DeleteFile createDeleteFile(Table table, StructLike partition, CharSequence dataFilePath) { + private DeleteFile createDeleteFile( + Table table, StructLike partition, CharSequence dataFilePath) { // Create a position delete file that references the given data file // This simulates a delete operation on the same file being rewritten return FileMetadata.deleteFileBuilder(table.spec()) From 7c30ad6cc7af442edb395833aa725e43d5f2b565 Mon Sep 17 00:00:00 2001 From: Rion Williams Date: Tue, 20 Jan 2026 21:54:17 -0600 Subject: [PATCH 3/6] [lake/iceberg] Update IcebergLakeCommiter commit operation ordering to avoid blocking behavior [lake/iceberg] Update IcebergLakeCommiter commit operation ordering to avoid blocking behavior --- .../iceberg/tiering/IcebergLakeCommitter.java | 62 +++---- .../tiering/IcebergLakeCommitterTest.java | 161 ++++++++---------- 2 files changed, 101 insertions(+), 122 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java index d07c9eb915..d974a5a31e 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java @@ -104,42 +104,44 @@ public long commit(IcebergCommittable committable, Map snapshotP // Refresh table to get latest metadata icebergTable.refresh(); - SnapshotUpdate snapshotUpdate; - if (committable.getDeleteFiles().isEmpty()) { - // Simple append-only case: only data files, no delete files or compaction - AppendFiles appendFiles = icebergTable.newAppend(); - committable.getDataFiles().forEach(appendFiles::appendFile); - snapshotUpdate = appendFiles; - } else { - /* - Row delta validations are not needed for streaming changes that write equality - deletes. Equality deletes are applied to data in all previous sequence numbers, - so retries may push deletes further in the future, but do not affect correctness. - Position deletes committed to the table in this path are used only to delete rows - from data files that are being added in this commit. There is no way for data - files added along with the delete files to be concurrently removed, so there is - no need to validate the files referenced by the position delete files that are - being committed. - */ - RowDelta rowDelta = icebergTable.newRowDelta(); - committable.getDataFiles().forEach(rowDelta::addRows); - committable.getDeleteFiles().forEach(rowDelta::addDeletes); - snapshotUpdate = rowDelta; - } - - // commit written files - long snapshotId = commit(snapshotUpdate, snapshotProperties); + Long snapshotId = null; - // There exists rewrite files, commit rewrite files + // Since rewrite operations validate against a specific snapshot ID, it's important + // to commit rewrite files BEFORE data/delete files to avoid the snapshot advancing + // too soon (causing rewrite validation to fail) List rewriteDataFileResults = committable.rewriteDataFileResults(); if (!rewriteDataFileResults.isEmpty()) { - Long rewriteCommitSnapshotId = - commitRewrite(rewriteDataFileResults, snapshotProperties); - if (rewriteCommitSnapshotId != null) { - snapshotId = rewriteCommitSnapshotId; + snapshotId = commitRewrite(rewriteDataFileResults, snapshotProperties); + } + + // Commit data/delete files + if (!committable.getDataFiles().isEmpty() || !committable.getDeleteFiles().isEmpty()) { + SnapshotUpdate snapshotUpdate; + if (committable.getDeleteFiles().isEmpty()) { + // Simple append-only case: only data files, no delete files + AppendFiles appendFiles = icebergTable.newAppend(); + committable.getDataFiles().forEach(appendFiles::appendFile); + snapshotUpdate = appendFiles; + } else { + /* + Row delta validations are not needed for streaming changes that write equality + deletes. Equality deletes are applied to data in all previous sequence numbers, + so retries may push deletes further in the future, but do not affect correctness. + Position deletes committed to the table in this path are used only to delete rows + from data files that are being added in this commit. There is no way for data + files added along with the delete files to be concurrently removed, so there is + no need to validate the files referenced by the position delete files that are + being committed. + */ + RowDelta rowDelta = icebergTable.newRowDelta(); + committable.getDataFiles().forEach(rowDelta::addRows); + committable.getDeleteFiles().forEach(rowDelta::addDeletes); + snapshotUpdate = rowDelta; } + snapshotId = commit(snapshotUpdate, snapshotProperties); } + return checkNotNull(snapshotId, "Iceberg committed snapshot id must be non-null."); } catch (Exception e) { throw new IOException("Failed to commit to Iceberg table.", e); diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java index 6ea47524ab..a804fdcbcc 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java @@ -79,97 +79,6 @@ void setUp() { icebergCatalog = catalogProvider.get(); } - /** - * Reproduces the rewrite conflict bug documented in GitHub issue #2420. - * - *

This test reproduces the scenario from IcebergRewriteITCase where PK table writes with - * auto-compaction enabled cause blocking. The flow is: - * - *

    - *
  1. IcebergLakeWriter writes data to a PK table, generating delete files for updates - *
  2. Auto-compaction runs concurrently, producing RewriteDataFileResult - *
  3. complete() combines both: WriteResult (data + delete files) + RewriteDataFileResult - *
  4. IcebergLakeCommitter.commit() receives both in the same cycle - *
- * - *

Bug behavior: RowDelta commits first (because delete files are present), advancing the - * snapshot from N to N+1. The rewrite then calls validateFromSnapshot(N), but N is now stale. - * Iceberg's retry loop detects the mismatch and retries, but the conflict is permanent because - * the validation snapshot is fixed. With default Iceberg settings - * (commit.retry.total-timeout-ms = 30 minutes), this causes blocking for 15+ minutes before - * failing. - */ - @Test - void testRewriteFailsWhenDeleteFilesAddedInSameCycle() throws Exception { - // Create table and seed with initial data files - TablePath tablePath = TablePath.of("iceberg", "conflict_test_table"); - createTable(tablePath); - Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath)); - int bucket = 0; - - // Write 3 initial data files that will be the target of the rewrite - appendDataFiles(icebergTable, 3, 10, 0, bucket); - icebergTable.refresh(); - - int initialFileCount = countDataFiles(icebergTable); - assertThat(initialFileCount).isEqualTo(3); - - // Capture the current snapshot ID - this is what the rewrite will validate against - long rewriteSnapshotId = icebergTable.currentSnapshot().snapshotId(); - - // Collect the data files to be rewritten and get one to reference in delete file - DataFileSet filesToDelete = DataFileSet.create(); - DataFile firstDataFile = null; - for (FileScanTask task : icebergTable.newScan().planFiles()) { - filesToDelete.add(task.file()); - if (firstDataFile == null) { - firstDataFile = task.file(); - } - } - assertThat(filesToDelete).hasSize(3); - assertThat(firstDataFile).isNotNull(); - - // Create the rewrite result: compacting the 3 files into 1 - DataFileSet filesToAdd = DataFileSet.create(); - filesToAdd.add(writeDataFile(icebergTable, 30, 0, bucket)); - - RewriteDataFileResult rewriteResult = - new RewriteDataFileResult(rewriteSnapshotId, filesToDelete, filesToAdd); - - // Create a delete file that references one of the data files being rewritten - // IMPORTANT: when RowDelta commits this delete file, it will conflict with the rewrite's - // validation - // since the file now has pending deletes - DeleteFile deleteFile = - createDeleteFile(icebergTable, firstDataFile.partition(), firstDataFile.location()); - - // Create new data file to include in the commit - DataFile newDataFile = writeDataFile(icebergTable, 5, 100, bucket); - - // Build a committable with data file, delete file, AND rewrite results - // The presence of delete file triggers RowDelta path which commits first - IcebergCommittable committable = - IcebergCommittable.builder() - .addDataFile(newDataFile) - .addDeleteFile(deleteFile) - .addRewriteDataFileResult(rewriteResult) - .build(); - - // Perform the commit operation (to trigger the bug) - IcebergLakeCommitter committer = new IcebergLakeCommitter(catalogProvider, tablePath); - committer.commit(committable, Collections.emptyMap()); - committer.close(); - - // Verify that the rewrite will _silently_ resulting in the incorrect file counts - icebergTable.refresh(); - int finalFileCount = countDataFiles(icebergTable); - assertThat(finalFileCount) - .as( - "Rewrite failed silently. Expecting 4 files " - + "(3 original + 1 new) to demonstrate bug, should otherwise be 2 (due to compaction).") - .isEqualTo(4); - } - @Test void testRewriteWithDataFilesSucceeds() throws Exception { // Create table with minimal retry configuration to fail fast instead of blocking @@ -288,7 +197,6 @@ void testCommitSucceeds() throws Exception { committer.close(); } - /** Tests that committing data files with delete files (no rewrite) succeeds. */ @Test void testCommitWithDeleteFilesSucceeds() throws Exception { TablePath tablePath = TablePath.of("iceberg", "data_delete_table"); @@ -326,6 +234,75 @@ void testCommitWithDeleteFilesSucceeds() throws Exception { committer.close(); } + @Test + void testRewriteWithDeleteFilesInSameCycleSucceeds() throws Exception { + // Create table and seed with initial data files + TablePath tablePath = TablePath.of("iceberg", "conflict_test_table"); + createTable(tablePath); + Table icebergTable = icebergCatalog.loadTable(toIceberg(tablePath)); + int bucket = 0; + + // Write 3 initial data files that will be the target of the rewrite + appendDataFiles(icebergTable, 3, 10, 0, bucket); + icebergTable.refresh(); + + int initialFileCount = countDataFiles(icebergTable); + assertThat(initialFileCount).isEqualTo(3); + + // Capture the current snapshot ID - this is what the rewrite will validate against + long rewriteSnapshotId = icebergTable.currentSnapshot().snapshotId(); + + // Collect the data files to be rewritten and get one to reference in delete file + DataFileSet filesToDelete = DataFileSet.create(); + DataFile firstDataFile = null; + for (FileScanTask task : icebergTable.newScan().planFiles()) { + filesToDelete.add(task.file()); + if (firstDataFile == null) { + firstDataFile = task.file(); + } + } + assertThat(filesToDelete).hasSize(3); + assertThat(firstDataFile).isNotNull(); + + // Create the rewrite result: compacting the 3 files into 1 + DataFileSet filesToAdd = DataFileSet.create(); + filesToAdd.add(writeDataFile(icebergTable, 30, 0, bucket)); + + RewriteDataFileResult rewriteResult = + new RewriteDataFileResult(rewriteSnapshotId, filesToDelete, filesToAdd); + + // Create a delete file that references one of the data files being rewritten + // IMPORTANT: when RowDelta commits this delete file, it will conflict with the rewrite's + // validation + // since the file now has pending deletes + DeleteFile deleteFile = + createDeleteFile(icebergTable, firstDataFile.partition(), firstDataFile.location()); + + // Create new data file to include in the commit + DataFile newDataFile = writeDataFile(icebergTable, 5, 100, bucket); + + // Build a committable with data file, delete file, AND rewrite results + // The presence of delete file triggers RowDelta path which commits first + IcebergCommittable committable = + IcebergCommittable.builder() + .addDataFile(newDataFile) + .addDeleteFile(deleteFile) + .addRewriteDataFileResult(rewriteResult) + .build(); + + // Perform the commit operation + IcebergLakeCommitter committer = new IcebergLakeCommitter(catalogProvider, tablePath); + committer.commit(committable, Collections.emptyMap()); + committer.close(); + + // Verify that rewrite succeeded + icebergTable.refresh(); + int finalFileCount = countDataFiles(icebergTable); + assertThat(finalFileCount) + .as("Rewrite should succeed with delete files present - got %d files", finalFileCount) + .isEqualTo(2); + } + // Helper methods private void createTable(TablePath tablePath) { From 1e49d987c03f9450b4e4c3232e658db838c3e068 Mon Sep 17 00:00:00 2001 From: Rion Williams Date: Tue, 20 Jan 2026 21:59:53 -0600 Subject: [PATCH 4/6] [lake/iceberg] Minor clean-up and updated comments to improve clarity --- .../iceberg/tiering/IcebergLakeCommitter.java | 15 ++++++++++++--- .../iceberg/tiering/IcebergLakeCommitterTest.java | 2 +- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java index d974a5a31e..e3fd6e3a97 100644 --- a/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java +++ b/fluss-lake/fluss-lake-iceberg/src/main/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitter.java @@ -106,9 +106,18 @@ public long commit(IcebergCommittable committable, Map snapshotP Long snapshotId = null; - // Since rewrite operations validate against a specific snapshot ID, it's important - // to commit rewrite files BEFORE data/delete files to avoid the snapshot advancing - // too soon (causing rewrite validation to fail) + /* + Rewrite files MUST be committed BEFORE data/delete files. + + Rewrite operations call validateFromSnapshot(snapshotId) to ensure the files being + replaced haven't changed since the rewrite was planned. If we commit data/delete + files first (via AppendFiles or RowDelta), the table's current snapshot advances. + The subsequent rewrite validation then fails because it's checking against a + now-stale snapshot ID, triggering Iceberg's retry loop indefinitely. + + By committing rewrites first, the validation succeeds against the current snapshot, + and subsequent data/delete commits simply advance the snapshot further. + */ List rewriteDataFileResults = committable.rewriteDataFileResults(); if (!rewriteDataFileResults.isEmpty()) { diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java index a804fdcbcc..9811a63b93 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/tiering/IcebergLakeCommitterTest.java @@ -299,7 +299,7 @@ void testRewriteWithDeleteFilesInSameCycleSucceeds() throws Exception { icebergTable.refresh(); int finalFileCount = countDataFiles(icebergTable); assertThat(finalFileCount) - .as("Rewrite should succeed with delete files present - got %d files", finalFileCount) + .as("Rewrite should succeed with delete files present") .isEqualTo(2); } From d88232e3775c528af1795b8083993277d03dba4a Mon Sep 17 00:00:00 2001 From: Rion Williams Date: Tue, 20 Jan 2026 23:39:44 -0600 Subject: [PATCH 5/6] [lake/iceberg] Updated previous rewrite-conflict integration test to align with updated commit behavior --- .../iceberg/maintenance/IcebergRewriteITCase.java | 11 ++++++----- .../testutils/FlinkIcebergTieringTestBase.java | 10 ++++++---- 2 files changed, 12 insertions(+), 9 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java index 4b6d9de71a..3893276ce6 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java @@ -126,7 +126,7 @@ private void checkRecords(List actualRows, List expectedRow } @Test - void testPkTableCompactionWithConflict() throws Exception { + void testPkTableCompactionWithDeletedFiles() throws Exception { JobClient jobClient = buildTieringJob(execEnv); try { TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_2"); @@ -147,17 +147,18 @@ void testPkTableCompactionWithConflict() throws Exception { // add pos-delete and trigger compaction rows = Arrays.asList(row(4, "v1"), row(4, "v2")); flussRows.add(writeIcebergTableRecords(t1, t1Bucket, 6, false, rows).get(1)); - // rewritten files should fail to commit due to conflict, add check here checkRecords(getIcebergRecords(t1), flussRows); - // 4 data file and 1 delete file - checkFileStatusInIcebergTable(t1, 4, true); + checkFileStatusInIcebergTable(t1, 2, true); // previous compaction conflicts won't prevent further compaction, and check iceberg // records rows = Collections.singletonList(row(5, "v1")); flussRows.addAll(writeIcebergTableRecords(t1, t1Bucket, 7, false, rows)); checkRecords(getIcebergRecords(t1), flussRows); - checkFileStatusInIcebergTable(t1, 2, false); + // check that the correct number of files (2 from compaction + 1 new) exist and that a + // deletion + // file is present + checkFileStatusInIcebergTable(t1, 3, true); } finally { jobClient.cancel().get(); } diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java index 99fd340cf4..fe690c5a5d 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java @@ -358,17 +358,19 @@ protected void checkFileStatusInIcebergTable( throws IOException { org.apache.iceberg.Table table = icebergCatalog.loadTable(toIceberg(tablePath)); int count = 0; + boolean deleteFileExists = false; try (CloseableIterable tasks = table.newScan().planFiles()) { for (FileScanTask ignored : tasks) { - if (shouldDeleteFileExist) { - assertThat(ignored.deletes()).isNotEmpty(); - } else { - assertThat(ignored.deletes()).isEmpty(); + // A deletion status may be mixed (some may contain deletions and + // others may not) so we want to flag if deletions are present + if (shouldDeleteFileExist && !deleteFileExists) { + deleteFileExists = !ignored.deletes().isEmpty(); } count++; } } assertThat(count).isEqualTo(expectedFileCount); + assertThat(deleteFileExists).isEqualTo(shouldDeleteFileExist); } protected void checkDataInIcebergAppendOnlyPartitionedTable( From 811f5952853c43d0b7922ef0c784bab24bceb9f4 Mon Sep 17 00:00:00 2001 From: Rion Williams Date: Tue, 20 Jan 2026 23:44:58 -0600 Subject: [PATCH 6/6] [lake/iceberg] Minor clean-up and spotless:apply formatting --- .../lake/iceberg/maintenance/IcebergRewriteITCase.java | 9 ++++----- .../iceberg/testutils/FlinkIcebergTieringTestBase.java | 4 ++-- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java index 3893276ce6..022717bc3f 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/maintenance/IcebergRewriteITCase.java @@ -150,14 +150,13 @@ void testPkTableCompactionWithDeletedFiles() throws Exception { checkRecords(getIcebergRecords(t1), flussRows); checkFileStatusInIcebergTable(t1, 2, true); - // previous compaction conflicts won't prevent further compaction, and check iceberg - // records + // previous compaction conflicts won't prevent further compaction, and + // check iceberg records rows = Collections.singletonList(row(5, "v1")); flussRows.addAll(writeIcebergTableRecords(t1, t1Bucket, 7, false, rows)); checkRecords(getIcebergRecords(t1), flussRows); - // check that the correct number of files (2 from compaction + 1 new) exist and that a - // deletion - // file is present + // check that the correct number of files (2 from compaction + 1 new) + // exist and that a deletion file is present checkFileStatusInIcebergTable(t1, 3, true); } finally { jobClient.cancel().get(); diff --git a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java index fe690c5a5d..45e3f5c9ee 100644 --- a/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java +++ b/fluss-lake/fluss-lake-iceberg/src/test/java/org/apache/fluss/lake/iceberg/testutils/FlinkIcebergTieringTestBase.java @@ -361,8 +361,8 @@ protected void checkFileStatusInIcebergTable( boolean deleteFileExists = false; try (CloseableIterable tasks = table.newScan().planFiles()) { for (FileScanTask ignored : tasks) { - // A deletion status may be mixed (some may contain deletions and - // others may not) so we want to flag if deletions are present + // Not all data files may have associated delete files, so track if + // any exist when the shouldDeleteFileExist flag is set if (shouldDeleteFileExist && !deleteFileExists) { deleteFileExists = !ignored.deletes().isEmpty(); }