From 1837ed70a07663819babb97216d08796b5948635 Mon Sep 17 00:00:00 2001 From: Feiyang Li Date: Tue, 13 Jan 2026 15:04:06 +0800 Subject: [PATCH 1/3] ver1 --- src/iceberg/CMakeLists.txt | 1 + src/iceberg/meson.build | 1 + src/iceberg/table_metadata.cc | 10 +- src/iceberg/test/CMakeLists.txt | 1 + src/iceberg/test/set_snapshot_test.cc | 236 ++++++++++++++++++++++++++ src/iceberg/transaction.cc | 8 + src/iceberg/update/meson.build | 1 + src/iceberg/update/pending_update.h | 1 + src/iceberg/update/set_snapshot.cc | 146 ++++++++++++++++ src/iceberg/update/set_snapshot.h | 97 +++++++++++ 10 files changed, 497 insertions(+), 5 deletions(-) create mode 100644 src/iceberg/test/set_snapshot_test.cc create mode 100644 src/iceberg/update/set_snapshot.cc create mode 100644 src/iceberg/update/set_snapshot.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 86a0efd7e..9a49997d2 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -84,6 +84,7 @@ set(ICEBERG_SOURCES update/expire_snapshots.cc update/pending_update.cc update/snapshot_update.cc + update/set_snapshot.cc update/update_partition_spec.cc update/update_properties.cc update/update_schema.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 87f508cd5..858f187a4 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -104,6 +104,7 @@ iceberg_sources = files( 'type.cc', 'update/expire_snapshots.cc', 'update/pending_update.cc', + 'update/set_snapshot.cc', 'update/snapshot_update.cc', 'update/update_partition_spec.cc', 'update/update_properties.cc', diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 7e357bbae..af81d9a3a 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -668,6 +668,9 @@ class TableMetadataBuilder::Impl { Result> UpdateSnapshotLog( int64_t current_snapshot_id) const; + /// \brief Internal method to set a branch snapshot + /// \param snapshot The snapshot to set + /// \param branch The branch name Status SetBranchSnapshotInternal(const Snapshot& snapshot, const std::string& branch); private: @@ -1090,11 +1093,8 @@ Status TableMetadataBuilder::Impl::SetBranchSnapshot(int64_t snapshot_id, // change is a noop return {}; } - - auto snapshot_it = snapshots_by_id_.find(snapshot_id); - ICEBERG_CHECK(snapshot_it != snapshots_by_id_.end(), - "Cannot set {} to unknown snapshot: {}", branch, snapshot_id); - return SetBranchSnapshotInternal(*snapshot_it->second, branch); + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, metadata_.SnapshotById(snapshot_id)); + return SetBranchSnapshotInternal(*snapshot, branch); } Status TableMetadataBuilder::Impl::SetBranchSnapshot(std::shared_ptr snapshot, diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 1f6ab5521..a80bc1d15 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -171,6 +171,7 @@ if(ICEBERG_BUILD_BUNDLE) USE_BUNDLE SOURCES expire_snapshots_test.cc + set_snapshot_test.cc transaction_test.cc update_partition_spec_test.cc update_properties_test.cc diff --git a/src/iceberg/test/set_snapshot_test.cc b/src/iceberg/test/set_snapshot_test.cc new file mode 100644 index 000000000..75a8fe2c0 --- /dev/null +++ b/src/iceberg/test/set_snapshot_test.cc @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/set_snapshot.h" + +#include + +#include +#include + +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" +#include "iceberg/transaction.h" + +namespace iceberg { + +// Test fixture for SetSnapshot tests +class SetSnapshotTest : public UpdateTestBase { + protected: + // Snapshot IDs from TableMetadataV2Valid.json + static constexpr int64_t kOldestSnapshotId = 3051729675574597004; + static constexpr int64_t kCurrentSnapshotId = 3055729675574597004; + + // Timestamps from TableMetadataV2Valid.json + static constexpr int64_t kOldestSnapshotTimestamp = 1515100955770; + static constexpr int64_t kCurrentSnapshotTimestamp = 1555100955770; +}; + +TEST_F(SetSnapshotTest, SetCurrentSnapshotValid) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Set current snapshot to the older snapshot + set_snapshot->SetCurrentSnapshot(kOldestSnapshotId); + + // Apply and verify + ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); + EXPECT_NE(result, nullptr); + EXPECT_EQ(result->snapshot_id, kOldestSnapshotId); +} + +TEST_F(SetSnapshotTest, SetCurrentSnapshotToCurrentSnapshot) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Set current snapshot to the current snapshot (no-op) + set_snapshot->SetCurrentSnapshot(kCurrentSnapshotId); + + // Apply and verify + ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); + EXPECT_NE(result, nullptr); + EXPECT_EQ(result->snapshot_id, kCurrentSnapshotId); +} + +TEST_F(SetSnapshotTest, SetCurrentSnapshotInvalid) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Try to set to a non-existent snapshot + int64_t invalid_snapshot_id = 9999999999999999; + set_snapshot->SetCurrentSnapshot(invalid_snapshot_id); + + // Should fail during Apply + auto result = set_snapshot->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("unknown snapshot id")); +} + +TEST_F(SetSnapshotTest, RollbackToValid) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Rollback to the oldest snapshot (which is an ancestor) + set_snapshot->RollbackTo(kOldestSnapshotId); + + // Apply and verify + ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); + EXPECT_NE(result, nullptr); + EXPECT_EQ(result->snapshot_id, kOldestSnapshotId); +} + +TEST_F(SetSnapshotTest, RollbackToInvalidSnapshot) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Try to rollback to a non-existent snapshot + int64_t invalid_snapshot_id = 9999999999999999; + set_snapshot->RollbackTo(invalid_snapshot_id); + + // Should fail during Apply + auto result = set_snapshot->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("unknown snapshot id")); +} + +TEST_F(SetSnapshotTest, RollbackToTimeValidOldestSnapshot) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Rollback to a time between the two snapshots + // This should select the oldest snapshot + int64_t time_between = (kOldestSnapshotTimestamp + kCurrentSnapshotTimestamp) / 2; + set_snapshot->RollbackToTime(time_between); + + // Apply and verify + ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); + EXPECT_NE(result, nullptr); + EXPECT_EQ(result->snapshot_id, kOldestSnapshotId); +} + +TEST_F(SetSnapshotTest, RollbackToTimeBeforeAnySnapshot) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Try to rollback to a time before any snapshot + int64_t time_before_all = kOldestSnapshotTimestamp - 1000000; + set_snapshot->RollbackToTime(time_before_all); + + // Should fail - no snapshot older than the specified time + auto result = set_snapshot->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("no valid snapshot older than")); +} + +TEST_F(SetSnapshotTest, RollbackToTimeExactMatch) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Rollback to a timestamp just after the oldest snapshot + // This should return the oldest snapshot (the latest one before this time) + int64_t time_just_after_oldest = kOldestSnapshotTimestamp + 1; + set_snapshot->RollbackToTime(time_just_after_oldest); + + // Apply and verify - should return the oldest snapshot + ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); + EXPECT_NE(result, nullptr); + EXPECT_EQ(result->snapshot_id, kOldestSnapshotId); +} + +TEST_F(SetSnapshotTest, ApplyWithoutChanges) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Apply without making any changes (NOOP) + ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); + + // Should return current snapshot + EXPECT_NE(result, nullptr); + EXPECT_EQ(result->snapshot_id, kCurrentSnapshotId); +} + +TEST_F(SetSnapshotTest, MethodChaining) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Test that methods return reference for chaining + // Note: Only the last operation should take effect + auto& result1 = set_snapshot->SetCurrentSnapshot(kOldestSnapshotId); + EXPECT_EQ(&result1, set_snapshot.get()); +} + +TEST_F(SetSnapshotTest, CommitSuccess) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Set to oldest snapshot + set_snapshot->SetCurrentSnapshot(kOldestSnapshotId); + + // Commit the change + EXPECT_THAT(set_snapshot->Commit(), IsOk()); + + // Commit the transaction + ICEBERG_UNWRAP_OR_FAIL(auto updated_table, txn->Commit()); + + // Verify the current snapshot was changed + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); +} + +TEST_F(SetSnapshotTest, CommitEmptyUpdate) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Commit without making any changes (NOOP) + EXPECT_THAT(set_snapshot->Commit(), IsOk()); + + // Commit the transaction + ICEBERG_UNWRAP_OR_FAIL(auto updated_table, txn->Commit()); + + // Verify the current snapshot remained the same + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, kCurrentSnapshotId); +} + +TEST_F(SetSnapshotTest, KindReturnsSetSnapshot) { + // Create transaction and SetSnapshot + ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + + // Verify the kind is correct + EXPECT_EQ(set_snapshot->kind(), PendingUpdate::Kind::kSetSnapshot); +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index f763c5670..f99ca5f02 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -24,6 +24,7 @@ #include "iceberg/catalog.h" #include "iceberg/schema.h" +#include "iceberg/snapshot.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" @@ -32,6 +33,7 @@ #include "iceberg/table_update.h" #include "iceberg/update/expire_snapshots.h" #include "iceberg/update/pending_update.h" +#include "iceberg/update/set_snapshot.h" #include "iceberg/update/snapshot_update.h" #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" @@ -183,6 +185,12 @@ Status Transaction::Apply(PendingUpdate& update) { metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove)); } } break; + case PendingUpdate::Kind::kSetSnapshot: { + auto& set_snapshot = internal::checked_cast(update); + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, set_snapshot.Apply()); + metadata_builder_->SetBranchSnapshot(snapshot->snapshot_id, + std::string(SnapshotRef::kMainBranch)); + } break; default: return NotSupported("Unsupported pending update: {}", static_cast(update.kind())); diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 4238e0222..4ce209d76 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -18,6 +18,7 @@ install_headers( [ 'pending_update.h', + 'set_snapshot.h', 'snapshot_update.h', 'update_partition_spec.h', 'update_schema.h', diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index 8a8329eee..12cd9879c 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -48,6 +48,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { kUpdateSchema, kUpdateSnapshot, kUpdateSortOrder, + kSetSnapshot, }; /// \brief Return the kind of this pending update. diff --git a/src/iceberg/update/set_snapshot.cc b/src/iceberg/update/set_snapshot.cc new file mode 100644 index 000000000..f1a173872 --- /dev/null +++ b/src/iceberg/update/set_snapshot.cc @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/set_snapshot.h" + +#include +#include +#include + +#include "iceberg/snapshot.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" +#include "iceberg/util/timepoint.h" + +namespace iceberg { + +Result> SetSnapshot::Make( + std::shared_ptr transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create SetSnapshot without a transaction"); + return std::shared_ptr(new SetSnapshot(std::move(transaction))); +} + +SetSnapshot::SetSnapshot(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)) {} + +SetSnapshot::~SetSnapshot() = default; + +SetSnapshot& SetSnapshot::SetCurrentSnapshot(int64_t snapshot_id) { + const TableMetadata& base_metadata = transaction_->current(); + + // Validate that the snapshot exists + auto snapshot_result = base_metadata.SnapshotById(snapshot_id); + ICEBERG_BUILDER_CHECK(snapshot_result.has_value(), + "Cannot roll back to unknown snapshot id: {}", snapshot_id); + + target_snapshot_id_ = snapshot_id; + + return *this; +} + +SetSnapshot& SetSnapshot::RollbackToTime(int64_t timestamp_ms) { + // Find the latest snapshot by timestamp older than timestamp_ms + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto snapshot_opt, + FindLatestAncestorOlderThan(timestamp_ms)); + + ICEBERG_BUILDER_CHECK(snapshot_opt.has_value(), + "Cannot roll back, no valid snapshot older than: {}", + timestamp_ms); + + target_snapshot_id_ = snapshot_opt.value()->snapshot_id; + is_rollback_ = true; + + return *this; +} + +SetSnapshot& SetSnapshot::RollbackTo(int64_t snapshot_id) { + const TableMetadata& current = transaction_->current(); + + // Validate that the snapshot exists + auto snapshot_result = current.SnapshotById(snapshot_id); + ICEBERG_BUILDER_CHECK(snapshot_result.has_value(), + "Cannot roll back to unknown snapshot id: {}", snapshot_id); + + // Validate that the snapshot is an ancestor of the current state + ICEBERG_BUILDER_ASSIGN_OR_RETURN( + bool is_ancestor, SnapshotUtil::IsAncestorOf(*transaction_->table(), snapshot_id)); + ICEBERG_BUILDER_CHECK( + is_ancestor, + "Cannot roll back to snapshot, not an ancestor of the current state: {}", + snapshot_id); + + return SetCurrentSnapshot(snapshot_id); +} + +Result> SetSnapshot::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + const TableMetadata& base_metadata = transaction_->current(); + + // If no target snapshot was configured, return current state (NOOP) + if (!target_snapshot_id_.has_value()) { + return base_metadata.Snapshot(); + } + + // If this is a rollback, validate that the target is still an ancestor + if (is_rollback_) { + ICEBERG_ASSIGN_OR_RAISE( + bool is_ancestor, + SnapshotUtil::IsAncestorOf(*transaction_->table(), target_snapshot_id_.value())); + ICEBERG_CHECK(is_ancestor, + "Cannot roll back to {}: not an ancestor of the current table state", + target_snapshot_id_.value()); + } + + return base_metadata.SnapshotById(target_snapshot_id_.value()); +} + +Result>> SetSnapshot::FindLatestAncestorOlderThan( + int64_t timestamp_ms) const { + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, + SnapshotUtil::CurrentAncestors(*transaction_->table())); + + int64_t snapshot_timestamp = 0; + std::shared_ptr result = nullptr; + + for (const auto& snapshot : ancestors) { + if (snapshot == nullptr) { + continue; + } + + int64_t snap_timestamp_ms = UnixMsFromTimePointMs(snapshot->timestamp_ms); + + if (snap_timestamp_ms < timestamp_ms && snap_timestamp_ms > snapshot_timestamp) { + result = snapshot; + snapshot_timestamp = snap_timestamp_ms; + } + } + + if (result == nullptr) { + return std::nullopt; + } + + return result; +} + +} // namespace iceberg diff --git a/src/iceberg/update/set_snapshot.h b/src/iceberg/update/set_snapshot.h new file mode 100644 index 000000000..b2ac41fec --- /dev/null +++ b/src/iceberg/update/set_snapshot.h @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +/// \file iceberg/update/set_snapshot.h +/// \brief Sets the current snapshot directly or by rolling back. + +namespace iceberg { + +/// \brief Sets the current snapshot directly or by rolling back. +/// +/// This update is not exposed through the Table API. Instead, it is a package-private +/// part of the Transaction API intended for use in ManageSnapshots. +/// +/// When committing, these changes will be applied to the current table metadata. +/// Commit conflicts will not be resolved and will result in a CommitFailed error. +class ICEBERG_EXPORT SetSnapshot : public PendingUpdate { + public: + static Result> Make( + std::shared_ptr transaction); + + ~SetSnapshot() override; + + /// \brief Sets the table's current state to a specific Snapshot identified by id. + /// + /// This method allows setting the current snapshot to any valid snapshot defined + /// in the table metadata, regardless of its relationship to the current state. + /// + /// \param snapshot_id The snapshot ID to set as current + /// \return Reference to this for method chaining + SetSnapshot& SetCurrentSnapshot(int64_t snapshot_id); + + /// \brief Rolls back the table's state to the last Snapshot before the given timestamp. + /// + /// This method traverses the history of the current snapshot to find the most recent + /// ancestor that happened before the specified time. + /// + /// \param timestamp_ms A timestamp in milliseconds since the Unix epoch + /// \return Reference to this for method chaining + SetSnapshot& RollbackToTime(int64_t timestamp_ms); + + /// \brief Rollback table's state to a specific Snapshot identified by id. + /// + /// This method strictly validates that the target snapshot is an ancestor of the + /// current table state. + /// + /// \param snapshot_id The snapshot ID to roll back to. Must be an ancestor of the + /// current snapshot + /// \return Reference to this for method chaining + SetSnapshot& RollbackTo(int64_t snapshot_id); + + Kind kind() const final { return Kind::kSetSnapshot; } + + /// \brief Apply the pending changes and return the target snapshot. + Result> Apply(); + + private: + explicit SetSnapshot(std::shared_ptr transaction); + + /// \brief Find the latest snapshot whose timestamp is before the provided timestamp. + /// + /// \param timestamp_ms Lookup snapshots before this timestamp + /// \return The snapshot that was current at the given timestamp, or nullopt + Result>> FindLatestAncestorOlderThan( + int64_t timestamp_ms) const; + + std::optional target_snapshot_id_; + bool is_rollback_{false}; +}; + +} // namespace iceberg From 107cd63853154cb01f0ac12a275572e244dcf4d5 Mon Sep 17 00:00:00 2001 From: Feiyang Li Date: Thu, 15 Jan 2026 11:07:48 +0800 Subject: [PATCH 2/3] fix review --- src/iceberg/table_metadata.cc | 7 +++++-- src/iceberg/update/pending_update.h | 2 +- src/iceberg/update/set_snapshot.cc | 8 ++------ src/iceberg/update/set_snapshot.h | 6 ------ 4 files changed, 8 insertions(+), 15 deletions(-) diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index af81d9a3a..5e8abf468 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -1093,8 +1093,11 @@ Status TableMetadataBuilder::Impl::SetBranchSnapshot(int64_t snapshot_id, // change is a noop return {}; } - ICEBERG_ASSIGN_OR_RAISE(auto snapshot, metadata_.SnapshotById(snapshot_id)); - return SetBranchSnapshotInternal(*snapshot, branch); + + auto snapshot_it = snapshots_by_id_.find(snapshot_id); + ICEBERG_CHECK(snapshot_it != snapshots_by_id_.end(), + "Cannot set {} to unknown snapshot: {}", branch, snapshot_id); + return SetBranchSnapshotInternal(*snapshot_it->second, branch); } Status TableMetadataBuilder::Impl::SetBranchSnapshot(std::shared_ptr snapshot, diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index 12cd9879c..c6c5fc760 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -43,12 +43,12 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { public: enum class Kind : uint8_t { kExpireSnapshots, + kSetSnapshot, kUpdatePartitionSpec, kUpdateProperties, kUpdateSchema, kUpdateSnapshot, kUpdateSortOrder, - kSetSnapshot, }; /// \brief Return the kind of this pending update. diff --git a/src/iceberg/update/set_snapshot.cc b/src/iceberg/update/set_snapshot.cc index f1a173872..a22c07b63 100644 --- a/src/iceberg/update/set_snapshot.cc +++ b/src/iceberg/update/set_snapshot.cc @@ -46,10 +46,8 @@ SetSnapshot::SetSnapshot(std::shared_ptr transaction) SetSnapshot::~SetSnapshot() = default; SetSnapshot& SetSnapshot::SetCurrentSnapshot(int64_t snapshot_id) { - const TableMetadata& base_metadata = transaction_->current(); - // Validate that the snapshot exists - auto snapshot_result = base_metadata.SnapshotById(snapshot_id); + auto snapshot_result = base().SnapshotById(snapshot_id); ICEBERG_BUILDER_CHECK(snapshot_result.has_value(), "Cannot roll back to unknown snapshot id: {}", snapshot_id); @@ -74,10 +72,8 @@ SetSnapshot& SetSnapshot::RollbackToTime(int64_t timestamp_ms) { } SetSnapshot& SetSnapshot::RollbackTo(int64_t snapshot_id) { - const TableMetadata& current = transaction_->current(); - // Validate that the snapshot exists - auto snapshot_result = current.SnapshotById(snapshot_id); + auto snapshot_result = base().SnapshotById(snapshot_id); ICEBERG_BUILDER_CHECK(snapshot_result.has_value(), "Cannot roll back to unknown snapshot id: {}", snapshot_id); diff --git a/src/iceberg/update/set_snapshot.h b/src/iceberg/update/set_snapshot.h index b2ac41fec..e1c6082f4 100644 --- a/src/iceberg/update/set_snapshot.h +++ b/src/iceberg/update/set_snapshot.h @@ -34,12 +34,6 @@ namespace iceberg { /// \brief Sets the current snapshot directly or by rolling back. -/// -/// This update is not exposed through the Table API. Instead, it is a package-private -/// part of the Transaction API intended for use in ManageSnapshots. -/// -/// When committing, these changes will be applied to the current table metadata. -/// Commit conflicts will not be resolved and will result in a CommitFailed error. class ICEBERG_EXPORT SetSnapshot : public PendingUpdate { public: static Result> Make( From 0bce749c2cd7ae0abbcdff1f41565d25ed255788 Mon Sep 17 00:00:00 2001 From: Feiyang Li Date: Fri, 16 Jan 2026 15:17:18 +0800 Subject: [PATCH 3/3] fix review2 --- src/iceberg/table.cc | 7 ++ src/iceberg/table.h | 4 + src/iceberg/table_metadata.cc | 3 - src/iceberg/test/set_snapshot_test.cc | 140 +++++----------------- src/iceberg/transaction.cc | 11 +- src/iceberg/transaction.h | 5 + src/iceberg/type_fwd.h | 1 + src/iceberg/update/set_snapshot.cc | 43 +++---- src/iceberg/update/set_snapshot.h | 23 +--- src/iceberg/util/snapshot_util.cc | 31 ++++- src/iceberg/util/snapshot_util_internal.h | 17 +++ 11 files changed, 128 insertions(+), 157 deletions(-) diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index c79ac53fb..e85a77925 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -192,6 +192,13 @@ Result> Table::NewExpireSnapshots() { return transaction->NewExpireSnapshots(); } +Result> Table::NewSetSnapshot() { + ICEBERG_ASSIGN_OR_RAISE( + auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, + /*auto_commit=*/true)); + return transaction->NewSetSnapshot(); +} + Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table.h b/src/iceberg/table.h index cc9482486..9aa8532ee 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -151,6 +151,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// changes. virtual Result> NewExpireSnapshots(); + /// \brief Create a new SetSnapshot to set the current snapshot or rollback to a + /// previous snapshot and commit the changes. + virtual Result> NewSetSnapshot(); + protected: Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 5e8abf468..7e357bbae 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -668,9 +668,6 @@ class TableMetadataBuilder::Impl { Result> UpdateSnapshotLog( int64_t current_snapshot_id) const; - /// \brief Internal method to set a branch snapshot - /// \param snapshot The snapshot to set - /// \param branch The branch name Status SetBranchSnapshotInternal(const Snapshot& snapshot, const std::string& branch); private: diff --git a/src/iceberg/test/set_snapshot_test.cc b/src/iceberg/test/set_snapshot_test.cc index 75a8fe2c0..c25c3cc97 100644 --- a/src/iceberg/test/set_snapshot_test.cc +++ b/src/iceberg/test/set_snapshot_test.cc @@ -45,38 +45,31 @@ class SetSnapshotTest : public UpdateTestBase { }; TEST_F(SetSnapshotTest, SetCurrentSnapshotValid) { - // Create transaction and SetSnapshot - ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); + EXPECT_EQ(set_snapshot->kind(), PendingUpdate::Kind::kSetSnapshot); - // Set current snapshot to the older snapshot set_snapshot->SetCurrentSnapshot(kOldestSnapshotId); - // Apply and verify - ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); - EXPECT_NE(result, nullptr); - EXPECT_EQ(result->snapshot_id, kOldestSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); + EXPECT_EQ(snapshot_id, kOldestSnapshotId); + + // Commit and verify the change was persisted + EXPECT_THAT(set_snapshot->Commit(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); } TEST_F(SetSnapshotTest, SetCurrentSnapshotToCurrentSnapshot) { - // Create transaction and SetSnapshot - ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); - - // Set current snapshot to the current snapshot (no-op) + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); set_snapshot->SetCurrentSnapshot(kCurrentSnapshotId); - // Apply and verify - ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); - EXPECT_NE(result, nullptr); - EXPECT_EQ(result->snapshot_id, kCurrentSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); + EXPECT_EQ(snapshot_id, kCurrentSnapshotId); } TEST_F(SetSnapshotTest, SetCurrentSnapshotInvalid) { - // Create transaction and SetSnapshot - ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); - + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); // Try to set to a non-existent snapshot int64_t invalid_snapshot_id = 9999999999999999; set_snapshot->SetCurrentSnapshot(invalid_snapshot_id); @@ -84,28 +77,21 @@ TEST_F(SetSnapshotTest, SetCurrentSnapshotInvalid) { // Should fail during Apply auto result = set_snapshot->Apply(); EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); - EXPECT_THAT(result, HasErrorMessage("unknown snapshot id")); + EXPECT_THAT(result, HasErrorMessage("is not found")); } TEST_F(SetSnapshotTest, RollbackToValid) { - // Create transaction and SetSnapshot - ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); - + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); // Rollback to the oldest snapshot (which is an ancestor) set_snapshot->RollbackTo(kOldestSnapshotId); // Apply and verify - ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); - EXPECT_NE(result, nullptr); - EXPECT_EQ(result->snapshot_id, kOldestSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); + EXPECT_EQ(snapshot_id, kOldestSnapshotId); } TEST_F(SetSnapshotTest, RollbackToInvalidSnapshot) { - // Create transaction and SetSnapshot - ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); - + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); // Try to rollback to a non-existent snapshot int64_t invalid_snapshot_id = 9999999999999999; set_snapshot->RollbackTo(invalid_snapshot_id); @@ -117,26 +103,19 @@ TEST_F(SetSnapshotTest, RollbackToInvalidSnapshot) { } TEST_F(SetSnapshotTest, RollbackToTimeValidOldestSnapshot) { - // Create transaction and SetSnapshot - ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); - + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); // Rollback to a time between the two snapshots // This should select the oldest snapshot int64_t time_between = (kOldestSnapshotTimestamp + kCurrentSnapshotTimestamp) / 2; set_snapshot->RollbackToTime(time_between); // Apply and verify - ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); - EXPECT_NE(result, nullptr); - EXPECT_EQ(result->snapshot_id, kOldestSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); + EXPECT_EQ(snapshot_id, kOldestSnapshotId); } TEST_F(SetSnapshotTest, RollbackToTimeBeforeAnySnapshot) { - // Create transaction and SetSnapshot - ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); - + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); // Try to rollback to a time before any snapshot int64_t time_before_all = kOldestSnapshotTimestamp - 1000000; set_snapshot->RollbackToTime(time_before_all); @@ -148,89 +127,30 @@ TEST_F(SetSnapshotTest, RollbackToTimeBeforeAnySnapshot) { } TEST_F(SetSnapshotTest, RollbackToTimeExactMatch) { - // Create transaction and SetSnapshot - ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); - + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); // Rollback to a timestamp just after the oldest snapshot // This should return the oldest snapshot (the latest one before this time) int64_t time_just_after_oldest = kOldestSnapshotTimestamp + 1; set_snapshot->RollbackToTime(time_just_after_oldest); // Apply and verify - should return the oldest snapshot - ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); - EXPECT_NE(result, nullptr); - EXPECT_EQ(result->snapshot_id, kOldestSnapshotId); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); + EXPECT_EQ(snapshot_id, kOldestSnapshotId); } TEST_F(SetSnapshotTest, ApplyWithoutChanges) { - // Create transaction and SetSnapshot - ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); - + ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, table_->NewSetSnapshot()); // Apply without making any changes (NOOP) - ICEBERG_UNWRAP_OR_FAIL(auto result, set_snapshot->Apply()); + ICEBERG_UNWRAP_OR_FAIL(auto snapshot_id, set_snapshot->Apply()); // Should return current snapshot - EXPECT_NE(result, nullptr); - EXPECT_EQ(result->snapshot_id, kCurrentSnapshotId); -} + EXPECT_EQ(snapshot_id, kCurrentSnapshotId); -TEST_F(SetSnapshotTest, MethodChaining) { - // Create transaction and SetSnapshot - ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); - - // Test that methods return reference for chaining - // Note: Only the last operation should take effect - auto& result1 = set_snapshot->SetCurrentSnapshot(kOldestSnapshotId); - EXPECT_EQ(&result1, set_snapshot.get()); -} - -TEST_F(SetSnapshotTest, CommitSuccess) { - // Create transaction and SetSnapshot - ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); - - // Set to oldest snapshot - set_snapshot->SetCurrentSnapshot(kOldestSnapshotId); - - // Commit the change + // Commit NOOP and verify nothing changed EXPECT_THAT(set_snapshot->Commit(), IsOk()); - - // Commit the transaction - ICEBERG_UNWRAP_OR_FAIL(auto updated_table, txn->Commit()); - - // Verify the current snapshot was changed - ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); - ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); - EXPECT_EQ(current_snapshot->snapshot_id, kOldestSnapshotId); -} - -TEST_F(SetSnapshotTest, CommitEmptyUpdate) { - // Create transaction and SetSnapshot - ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); - - // Commit without making any changes (NOOP) - EXPECT_THAT(set_snapshot->Commit(), IsOk()); - - // Commit the transaction - ICEBERG_UNWRAP_OR_FAIL(auto updated_table, txn->Commit()); - - // Verify the current snapshot remained the same ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); EXPECT_EQ(current_snapshot->snapshot_id, kCurrentSnapshotId); } -TEST_F(SetSnapshotTest, KindReturnsSetSnapshot) { - // Create transaction and SetSnapshot - ICEBERG_UNWRAP_OR_FAIL(auto txn, table_->NewTransaction()); - ICEBERG_UNWRAP_OR_FAIL(auto set_snapshot, SetSnapshot::Make(txn)); - - // Verify the kind is correct - EXPECT_EQ(set_snapshot->kind(), PendingUpdate::Kind::kSetSnapshot); -} - } // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index f99ca5f02..8f6a49b35 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -187,8 +187,8 @@ Status Transaction::Apply(PendingUpdate& update) { } break; case PendingUpdate::Kind::kSetSnapshot: { auto& set_snapshot = internal::checked_cast(update); - ICEBERG_ASSIGN_OR_RAISE(auto snapshot, set_snapshot.Apply()); - metadata_builder_->SetBranchSnapshot(snapshot->snapshot_id, + ICEBERG_ASSIGN_OR_RAISE(auto snapshot_id, set_snapshot.Apply()); + metadata_builder_->SetBranchSnapshot(snapshot_id, std::string(SnapshotRef::kMainBranch)); } break; default: @@ -288,4 +288,11 @@ Result> Transaction::NewExpireSnapshots() { return expire_snapshots; } +Result> Transaction::NewSetSnapshot() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr set_snapshot, + SetSnapshot::Make(shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(set_snapshot)); + return set_snapshot; +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 057a27a90..fe5c2cc95 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -26,6 +26,7 @@ #include "iceberg/iceberg_export.h" #include "iceberg/result.h" #include "iceberg/type_fwd.h" +#include "iceberg/update/set_snapshot.h" namespace iceberg { @@ -82,6 +83,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewExpireSnapshots(); + /// \brief Create a new SetSnapshot to set the current snapshot or rollback to a + /// previous snapshot and commit the changes. + Result> NewSetSnapshot(); + private: Transaction(std::shared_ptr
table, Kind kind, bool auto_commit, std::unique_ptr metadata_builder); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index c8854031d..61e5eb21d 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -194,6 +194,7 @@ class UpdateProperties; class UpdateSchema; class UpdateSortOrder; class ExpireSnapshots; +class SetSnapshot; /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. diff --git a/src/iceberg/update/set_snapshot.cc b/src/iceberg/update/set_snapshot.cc index a22c07b63..21ebbd710 100644 --- a/src/iceberg/update/set_snapshot.cc +++ b/src/iceberg/update/set_snapshot.cc @@ -47,12 +47,10 @@ SetSnapshot::~SetSnapshot() = default; SetSnapshot& SetSnapshot::SetCurrentSnapshot(int64_t snapshot_id) { // Validate that the snapshot exists - auto snapshot_result = base().SnapshotById(snapshot_id); - ICEBERG_BUILDER_CHECK(snapshot_result.has_value(), + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto snapshot, base().SnapshotById(snapshot_id)); + ICEBERG_BUILDER_CHECK(snapshot != nullptr, "Cannot roll back to unknown snapshot id: {}", snapshot_id); - target_snapshot_id_ = snapshot_id; - return *this; } @@ -78,8 +76,8 @@ SetSnapshot& SetSnapshot::RollbackTo(int64_t snapshot_id) { "Cannot roll back to unknown snapshot id: {}", snapshot_id); // Validate that the snapshot is an ancestor of the current state - ICEBERG_BUILDER_ASSIGN_OR_RETURN( - bool is_ancestor, SnapshotUtil::IsAncestorOf(*transaction_->table(), snapshot_id)); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(bool is_ancestor, + SnapshotUtil::IsAncestorOf(base(), snapshot_id)); ICEBERG_BUILDER_CHECK( is_ancestor, "Cannot roll back to snapshot, not an ancestor of the current state: {}", @@ -88,47 +86,52 @@ SetSnapshot& SetSnapshot::RollbackTo(int64_t snapshot_id) { return SetCurrentSnapshot(snapshot_id); } -Result> SetSnapshot::Apply() { +Result SetSnapshot::Apply() { ICEBERG_RETURN_UNEXPECTED(CheckErrors()); const TableMetadata& base_metadata = transaction_->current(); // If no target snapshot was configured, return current state (NOOP) if (!target_snapshot_id_.has_value()) { - return base_metadata.Snapshot(); + ICEBERG_ASSIGN_OR_RAISE(auto current_snapshot, base_metadata.Snapshot()); + return current_snapshot->snapshot_id; } + // Validate that the snapshot exists + auto snapshot_result = base_metadata.SnapshotById(target_snapshot_id_.value()); + ICEBERG_CHECK(snapshot_result.has_value(), + "Cannot roll back to unknown snapshot id: {}", + target_snapshot_id_.value()); + // If this is a rollback, validate that the target is still an ancestor if (is_rollback_) { ICEBERG_ASSIGN_OR_RAISE( bool is_ancestor, - SnapshotUtil::IsAncestorOf(*transaction_->table(), target_snapshot_id_.value())); + SnapshotUtil::IsAncestorOf(base_metadata, target_snapshot_id_.value())); ICEBERG_CHECK(is_ancestor, "Cannot roll back to {}: not an ancestor of the current table state", target_snapshot_id_.value()); } - return base_metadata.SnapshotById(target_snapshot_id_.value()); + return target_snapshot_id_.value(); } Result>> SetSnapshot::FindLatestAncestorOlderThan( int64_t timestamp_ms) const { - ICEBERG_ASSIGN_OR_RAISE(auto ancestors, - SnapshotUtil::CurrentAncestors(*transaction_->table())); + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, SnapshotUtil::CurrentAncestors(base())); - int64_t snapshot_timestamp = 0; + TimePointMs target_timestamp = TimePointMsFromUnixMs(timestamp_ms); + TimePointMs latest_timestamp = TimePointMsFromUnixMs(0); std::shared_ptr result = nullptr; - for (const auto& snapshot : ancestors) { + for (auto& snapshot : ancestors) { if (snapshot == nullptr) { continue; } - - int64_t snap_timestamp_ms = UnixMsFromTimePointMs(snapshot->timestamp_ms); - - if (snap_timestamp_ms < timestamp_ms && snap_timestamp_ms > snapshot_timestamp) { - result = snapshot; - snapshot_timestamp = snap_timestamp_ms; + auto current_timestamp = snapshot->timestamp_ms; + if (current_timestamp < target_timestamp && current_timestamp > latest_timestamp) { + latest_timestamp = current_timestamp; // Save timestamp before move + result = std::move(snapshot); } } diff --git a/src/iceberg/update/set_snapshot.h b/src/iceberg/update/set_snapshot.h index e1c6082f4..c7abf7034 100644 --- a/src/iceberg/update/set_snapshot.h +++ b/src/iceberg/update/set_snapshot.h @@ -42,37 +42,18 @@ class ICEBERG_EXPORT SetSnapshot : public PendingUpdate { ~SetSnapshot() override; /// \brief Sets the table's current state to a specific Snapshot identified by id. - /// - /// This method allows setting the current snapshot to any valid snapshot defined - /// in the table metadata, regardless of its relationship to the current state. - /// - /// \param snapshot_id The snapshot ID to set as current - /// \return Reference to this for method chaining SetSnapshot& SetCurrentSnapshot(int64_t snapshot_id); /// \brief Rolls back the table's state to the last Snapshot before the given timestamp. - /// - /// This method traverses the history of the current snapshot to find the most recent - /// ancestor that happened before the specified time. - /// - /// \param timestamp_ms A timestamp in milliseconds since the Unix epoch - /// \return Reference to this for method chaining SetSnapshot& RollbackToTime(int64_t timestamp_ms); /// \brief Rollback table's state to a specific Snapshot identified by id. - /// - /// This method strictly validates that the target snapshot is an ancestor of the - /// current table state. - /// - /// \param snapshot_id The snapshot ID to roll back to. Must be an ancestor of the - /// current snapshot - /// \return Reference to this for method chaining SetSnapshot& RollbackTo(int64_t snapshot_id); Kind kind() const final { return Kind::kSetSnapshot; } - /// \brief Apply the pending changes and return the target snapshot. - Result> Apply(); + /// \brief Apply the pending changes and return the target snapshot ID. + Result Apply(); private: explicit SetSnapshot(std::shared_ptr transaction); diff --git a/src/iceberg/util/snapshot_util.cc b/src/iceberg/util/snapshot_util.cc index c3b93be8b..f6b8de055 100644 --- a/src/iceberg/util/snapshot_util.cc +++ b/src/iceberg/util/snapshot_util.cc @@ -69,6 +69,22 @@ Result SnapshotUtil::IsAncestorOf(const Table& table, return IsAncestorOf(table, current->snapshot_id, ancestor_snapshot_id); } +Result SnapshotUtil::IsAncestorOf(const TableMetadata& metadata, + int64_t ancestor_snapshot_id) { + ICEBERG_ASSIGN_OR_RAISE(auto current, metadata.Snapshot()); + ICEBERG_CHECK(current != nullptr, "Current snapshot is null"); + + // Create a lookup function that uses the metadata + auto lookup = [&metadata](int64_t id) -> Result> { + return metadata.SnapshotById(id); + }; + + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(current->snapshot_id, lookup)); + return std::ranges::any_of(ancestors, [ancestor_snapshot_id](const auto& snapshot) { + return snapshot != nullptr && snapshot->snapshot_id == ancestor_snapshot_id; + }); +} + Result SnapshotUtil::IsParentAncestorOf(const Table& table, int64_t snapshot_id, int64_t ancestor_parent_snapshot_id) { ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, snapshot_id)); @@ -86,6 +102,19 @@ Result>> SnapshotUtil::CurrentAncestors( return AncestorsOf(table, current_result.value()); } +Result>> SnapshotUtil::CurrentAncestors( + const TableMetadata& metadata) { + auto current_result = metadata.Snapshot(); + ICEBERG_ACTION_FOR_NOT_FOUND(current_result, return {}); + + // Create a lookup function that uses the metadata + auto lookup = [&metadata](int64_t id) -> Result> { + return metadata.SnapshotById(id); + }; + + return AncestorsOf(current_result.value(), lookup); +} + Result> SnapshotUtil::CurrentAncestorIds(const Table& table) { return CurrentAncestors(table).and_then(ToIds); } @@ -116,7 +145,7 @@ Result>> SnapshotUtil::OldestAncestorAft std::optional> last_snapshot = std::nullopt; ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(table, current)); - for (const auto& snapshot : ancestors) { + for (auto& snapshot : ancestors) { auto snapshot_timestamp_ms = snapshot->timestamp_ms; if (snapshot_timestamp_ms < timestamp_ms) { return last_snapshot; diff --git a/src/iceberg/util/snapshot_util_internal.h b/src/iceberg/util/snapshot_util_internal.h index ca106cb33..0a000c691 100644 --- a/src/iceberg/util/snapshot_util_internal.h +++ b/src/iceberg/util/snapshot_util_internal.h @@ -70,6 +70,15 @@ class ICEBERG_EXPORT SnapshotUtil { /// \return true if ancestor_snapshot_id is an ancestor of the current snapshot static Result IsAncestorOf(const Table& table, int64_t ancestor_snapshot_id); + /// \brief Returns whether ancestor_snapshot_id is an ancestor of the metadata's current + /// state. + /// + /// \param metadata The table metadata to check + /// \param ancestor_snapshot_id The ancestor snapshot ID to check for + /// \return true if ancestor_snapshot_id is an ancestor of the current snapshot + static Result IsAncestorOf(const TableMetadata& metadata, + int64_t ancestor_snapshot_id); + /// \brief Returns whether some ancestor of snapshot_id has parentId matches /// ancestor_parent_snapshot_id. /// @@ -88,6 +97,14 @@ class ICEBERG_EXPORT SnapshotUtil { static Result>> CurrentAncestors( const Table& table); + /// \brief Returns a vector that traverses the metadata's snapshots from the current to + /// the last known ancestor. + /// + /// \param metadata The table metadata + /// \return A vector from the metadata's current snapshot to its last known ancestor + static Result>> CurrentAncestors( + const TableMetadata& metadata); + /// \brief Returns the snapshot IDs for the ancestors of the current table state. /// /// Ancestor IDs are ordered by commit time, descending. The first ID is the current