Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,42 +104,53 @@ public long commit(IcebergCommittable committable, Map<String, String> snapshotP
// Refresh table to get latest metadata
icebergTable.refresh();

SnapshotUpdate<?> snapshotUpdate;
if (committable.getDeleteFiles().isEmpty()) {
// Simple append-only case: only data files, no delete files or compaction
AppendFiles appendFiles = icebergTable.newAppend();
committable.getDataFiles().forEach(appendFiles::appendFile);
snapshotUpdate = appendFiles;
} else {
/*
Row delta validations are not needed for streaming changes that write equality
deletes. Equality deletes are applied to data in all previous sequence numbers,
so retries may push deletes further in the future, but do not affect correctness.
Position deletes committed to the table in this path are used only to delete rows
from data files that are being added in this commit. There is no way for data
files added along with the delete files to be concurrently removed, so there is
no need to validate the files referenced by the position delete files that are
being committed.
*/
RowDelta rowDelta = icebergTable.newRowDelta();
committable.getDataFiles().forEach(rowDelta::addRows);
committable.getDeleteFiles().forEach(rowDelta::addDeletes);
snapshotUpdate = rowDelta;
}
Long snapshotId = null;

/*
Rewrite files MUST be committed BEFORE data/delete files.

// commit written files
long snapshotId = commit(snapshotUpdate, snapshotProperties);
Rewrite operations call validateFromSnapshot(snapshotId) to ensure the files being
replaced haven't changed since the rewrite was planned. If we commit data/delete
files first (via AppendFiles or RowDelta), the table's current snapshot advances.
The subsequent rewrite validation then fails because it's checking against a
now-stale snapshot ID, triggering Iceberg's retry loop indefinitely.

// There exists rewrite files, commit rewrite files
By committing rewrites first, the validation succeeds against the current snapshot,
and subsequent data/delete commits simply advance the snapshot further.
*/
List<RewriteDataFileResult> rewriteDataFileResults =
committable.rewriteDataFileResults();
if (!rewriteDataFileResults.isEmpty()) {
Long rewriteCommitSnapshotId =
commitRewrite(rewriteDataFileResults, snapshotProperties);
if (rewriteCommitSnapshotId != null) {
snapshotId = rewriteCommitSnapshotId;
snapshotId = commitRewrite(rewriteDataFileResults, snapshotProperties);
}

// Commit data/delete files
if (!committable.getDataFiles().isEmpty() || !committable.getDeleteFiles().isEmpty()) {
SnapshotUpdate<?> snapshotUpdate;
if (committable.getDeleteFiles().isEmpty()) {
// Simple append-only case: only data files, no delete files
AppendFiles appendFiles = icebergTable.newAppend();
committable.getDataFiles().forEach(appendFiles::appendFile);
snapshotUpdate = appendFiles;
} else {
/*
Row delta validations are not needed for streaming changes that write equality
deletes. Equality deletes are applied to data in all previous sequence numbers,
so retries may push deletes further in the future, but do not affect correctness.
Position deletes committed to the table in this path are used only to delete rows
from data files that are being added in this commit. There is no way for data
files added along with the delete files to be concurrently removed, so there is
no need to validate the files referenced by the position delete files that are
being committed.
*/
RowDelta rowDelta = icebergTable.newRowDelta();
committable.getDataFiles().forEach(rowDelta::addRows);
committable.getDeleteFiles().forEach(rowDelta::addDeletes);
snapshotUpdate = rowDelta;
}
snapshotId = commit(snapshotUpdate, snapshotProperties);
}

return checkNotNull(snapshotId, "Iceberg committed snapshot id must be non-null.");
} catch (Exception e) {
throw new IOException("Failed to commit to Iceberg table.", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ private void checkRecords(List<Record> actualRows, List<InternalRow> expectedRow
}

@Test
void testPkTableCompactionWithConflict() throws Exception {
void testPkTableCompactionWithDeletedFiles() throws Exception {
JobClient jobClient = buildTieringJob(execEnv);
try {
TablePath t1 = TablePath.of(DEFAULT_DB, "pk_table_2");
Expand All @@ -147,17 +147,17 @@ void testPkTableCompactionWithConflict() throws Exception {
// add pos-delete and trigger compaction
rows = Arrays.asList(row(4, "v1"), row(4, "v2"));
flussRows.add(writeIcebergTableRecords(t1, t1Bucket, 6, false, rows).get(1));
// rewritten files should fail to commit due to conflict, add check here
checkRecords(getIcebergRecords(t1), flussRows);
// 4 data file and 1 delete file
checkFileStatusInIcebergTable(t1, 4, true);
checkFileStatusInIcebergTable(t1, 2, true);

// previous compaction conflicts won't prevent further compaction, and check iceberg
// records
// previous compaction conflicts won't prevent further compaction, and
// check iceberg records
rows = Collections.singletonList(row(5, "v1"));
flussRows.addAll(writeIcebergTableRecords(t1, t1Bucket, 7, false, rows));
checkRecords(getIcebergRecords(t1), flussRows);
checkFileStatusInIcebergTable(t1, 2, false);
// check that the correct number of files (2 from compaction + 1 new)
// exist and that a deletion file is present
checkFileStatusInIcebergTable(t1, 3, true);
} finally {
jobClient.cancel().get();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -358,17 +358,19 @@ protected void checkFileStatusInIcebergTable(
throws IOException {
org.apache.iceberg.Table table = icebergCatalog.loadTable(toIceberg(tablePath));
int count = 0;
boolean deleteFileExists = false;
try (CloseableIterable<FileScanTask> tasks = table.newScan().planFiles()) {
for (FileScanTask ignored : tasks) {
if (shouldDeleteFileExist) {
assertThat(ignored.deletes()).isNotEmpty();
} else {
assertThat(ignored.deletes()).isEmpty();
// Not all data files may have associated delete files, so track if
// any exist when the shouldDeleteFileExist flag is set
if (shouldDeleteFileExist && !deleteFileExists) {
deleteFileExists = !ignored.deletes().isEmpty();
}
count++;
}
}
assertThat(count).isEqualTo(expectedFileCount);
assertThat(deleteFileExists).isEqualTo(shouldDeleteFileExist);
}

protected void checkDataInIcebergAppendOnlyPartitionedTable(
Expand Down
Loading