diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 86a0efd7e..122047e5b 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -88,6 +88,7 @@ set(ICEBERG_SOURCES update/update_properties.cc update/update_schema.cc update/update_sort_order.cc + update/update_snapshot_reference.cc util/bucket_util.cc util/content_file_util.cc util/conversions.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 87f508cd5..d06ddeff3 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -108,6 +108,7 @@ iceberg_sources = files( 'update/update_partition_spec.cc', 'update/update_properties.cc', 'update/update_schema.cc', + 'update/update_snapshot_reference.cc', 'update/update_sort_order.cc', 'util/bucket_util.cc', 'util/content_file_util.cc', diff --git a/src/iceberg/parquet/parquet_data_util.cc b/src/iceberg/parquet/parquet_data_util.cc index 14d20ff9e..43efd1cbd 100644 --- a/src/iceberg/parquet/parquet_data_util.cc +++ b/src/iceberg/parquet/parquet_data_util.cc @@ -148,7 +148,7 @@ Result> ProjectStructArray( return output_array; } -/// Templated implementation for projecting list arrays. +/// \brief Templated implementation for projecting list arrays. /// Works with both ListArray/ListType (32-bit offsets) and /// LargeListArray/LargeListType (64-bit offsets). template diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index f763c5670..794edca66 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -36,6 +36,7 @@ #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" #include "iceberg/update/update_schema.h" +#include "iceberg/update/update_snapshot_reference.h" #include "iceberg/update/update_sort_order.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/location_util.h" @@ -183,6 +184,24 @@ Status Transaction::Apply(PendingUpdate& update) { metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove)); } } break; + case PendingUpdate::Kind::kUpdateSnapshotReference: { + auto& update_ref = internal::checked_cast(update); + ICEBERG_ASSIGN_OR_RAISE(auto updated_refs, update_ref.Apply()); + const auto& current_refs = current().refs; + // Identify references which have been removed + for (const auto& [name, ref] : current_refs) { + if (updated_refs.find(name) == updated_refs.end()) { + metadata_builder_->RemoveRef(name); + } + } + // Identify references which have been created or updated + for (const auto& [name, ref] : updated_refs) { + auto current_it = current_refs.find(name); + if (current_it == current_refs.end() || *current_it->second != *ref) { + metadata_builder_->SetRef(name, ref); + } + } + } break; default: return NotSupported("Unsupported pending update: {}", static_cast(update.kind())); @@ -280,4 +299,12 @@ Result> Transaction::NewExpireSnapshots() { return expire_snapshots; } +Result> +Transaction::NewUpdateSnapshotReference() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_ref, + UpdateSnapshotReference::Make(shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_ref)); + return update_ref; +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 057a27a90..a206bd924 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -82,6 +82,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewExpireSnapshots(); + /// \brief Create a new UpdateSnapshotReference to update snapshot references (branches + /// and tags) and commit the changes. + Result> NewUpdateSnapshotReference(); + private: Transaction(std::shared_ptr table, Kind kind, bool auto_commit, std::unique_ptr metadata_builder); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index c8854031d..ad4512421 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -194,6 +194,7 @@ class UpdateProperties; class UpdateSchema; class UpdateSortOrder; class ExpireSnapshots; +class UpdateSnapshotReference; /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 4238e0222..cff8664d9 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -21,6 +21,7 @@ install_headers( 'snapshot_update.h', 'update_partition_spec.h', 'update_schema.h', + 'update_snapshot_reference.h', 'update_sort_order.h', 'update_properties.h', ], diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index 8a8329eee..444a405ef 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -48,6 +48,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { kUpdateSchema, kUpdateSnapshot, kUpdateSortOrder, + kUpdateSnapshotReference, }; /// \brief Return the kind of this pending update. diff --git a/src/iceberg/update/update_snapshot_reference.cc b/src/iceberg/update/update_snapshot_reference.cc new file mode 100644 index 000000000..ede118164 --- /dev/null +++ b/src/iceberg/update/update_snapshot_reference.cc @@ -0,0 +1,228 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_snapshot_reference.h" + +#include +#include +#include +#include + +#include "iceberg/result.h" +#include "iceberg/snapshot.h" +#include "iceberg/table_metadata.h" +#include "iceberg/transaction.h" +#include "iceberg/util/error_collector.h" +#include "iceberg/util/macros.h" +#include "iceberg/util/snapshot_util_internal.h" + +namespace iceberg { + +Result> UpdateSnapshotReference::Make( + std::shared_ptr transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create UpdateSnapshotReference without a transaction"); + return std::shared_ptr( + new UpdateSnapshotReference(std::move(transaction))); +} + +UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)) { + // Initialize updated_refs_ with current refs from base metadata + for (const auto& [name, ref] : base().refs) { + updated_refs_[name] = ref; + } +} + +UpdateSnapshotReference::~UpdateSnapshotReference() = default; + +UpdateSnapshotReference& UpdateSnapshotReference::CreateBranch(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty"); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto branch, SnapshotRef::MakeBranch(snapshot_id)); + auto [_, inserted] = updated_refs_.emplace(name, std::move(branch)); + ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", name); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::CreateTag(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty"); + ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto tag, SnapshotRef::MakeTag(snapshot_id)); + auto [_, inserted] = updated_refs_.emplace(name, std::move(tag)); + ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", name); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::RemoveBranch(const std::string& name) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty"); + ICEBERG_BUILDER_CHECK(name != SnapshotRef::kMainBranch, "Cannot remove main branch"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", name); + updated_refs_.erase(it); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::RemoveTag(const std::string& name) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag, + "Ref '{}' is a branch not a tag", name); + updated_refs_.erase(it); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::RenameBranch( + const std::string& name, const std::string& new_name) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch to rename cannot be empty"); + ICEBERG_BUILDER_CHECK(!new_name.empty(), "New branch name cannot be empty"); + ICEBERG_BUILDER_CHECK(name != SnapshotRef::kMainBranch, "Cannot rename main branch"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", name); + auto [_, inserted] = updated_refs_.emplace(new_name, it->second); + ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", new_name); + updated_refs_.erase(it); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", name); + it->second->snapshot_id = snapshot_id; + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::string& from, + const std::string& to) { + return ReplaceBranchInternal(from, to, false); +} + +UpdateSnapshotReference& UpdateSnapshotReference::FastForward(const std::string& from, + const std::string& to) { + return ReplaceBranchInternal(from, to, true); +} + +UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranchInternal( + const std::string& from, const std::string& to, bool fast_forward) { + ICEBERG_BUILDER_CHECK(!from.empty(), "Branch to update cannot be empty"); + ICEBERG_BUILDER_CHECK(!to.empty(), "Destination ref cannot be empty"); + auto to_it = updated_refs_.find(to); + ICEBERG_BUILDER_CHECK(to_it != updated_refs_.end(), "Ref does not exist: {}", to); + + auto from_it = updated_refs_.find(from); + if (from_it == updated_refs_.end()) { + return CreateBranch(from, to_it->second->snapshot_id); + } + + ICEBERG_BUILDER_CHECK(from_it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", from); + + // Nothing to replace if snapshot IDs are the same + if (to_it->second->snapshot_id == from_it->second->snapshot_id) { + return *this; + } + + if (fast_forward) { + const auto& base_metadata = transaction_->current(); + ICEBERG_BUILDER_ASSIGN_OR_RETURN( + auto target_is_ancestor, + SnapshotUtil::IsAncestorOf( + from_it->second->snapshot_id, to_it->second->snapshot_id, + [&base_metadata](int64_t id) { return base_metadata.SnapshotById(id); })); + + ICEBERG_BUILDER_CHECK(target_is_ancestor, + "Cannot fast-forward: {} is not an ancestor of {}", from, to); + } + + from_it->second->snapshot_id = to_it->second->snapshot_id; + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::ReplaceTag(const std::string& name, + int64_t snapshot_id) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag, + "Ref '{}' is a branch not a tag", name); + it->second->snapshot_id = snapshot_id; + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::SetMinSnapshotsToKeep( + const std::string& name, int32_t min_snapshots_to_keep) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", name); + std::get(it->second->retention).min_snapshots_to_keep = + min_snapshots_to_keep; + ICEBERG_BUILDER_CHECK(it->second->Validate(), + "Invalid min_snapshots_to_keep {} for branch '{}'", + min_snapshots_to_keep, name); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::SetMaxSnapshotAgeMs( + const std::string& name, int64_t max_snapshot_age_ms) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name); + ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch, + "Ref '{}' is a tag not a branch", name); + std::get(it->second->retention).max_snapshot_age_ms = + max_snapshot_age_ms; + ICEBERG_BUILDER_CHECK(it->second->Validate(), + "Invalid max_snapshot_age_ms {} for branch '{}'", + max_snapshot_age_ms, name); + return *this; +} + +UpdateSnapshotReference& UpdateSnapshotReference::SetMaxRefAgeMs(const std::string& name, + int64_t max_ref_age_ms) { + ICEBERG_BUILDER_CHECK(!name.empty(), "Reference name cannot be empty"); + auto it = updated_refs_.find(name); + ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Ref does not exist: {}", name); + if (it->second->type() == SnapshotRefType::kBranch) { + std::get(it->second->retention).max_ref_age_ms = max_ref_age_ms; + } else { + std::get(it->second->retention).max_ref_age_ms = max_ref_age_ms; + } + ICEBERG_BUILDER_CHECK(it->second->Validate(), "Invalid max_ref_age_ms {} for ref '{}'", + max_ref_age_ms, name); + return *this; +} + +Result>> +UpdateSnapshotReference::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + return updated_refs_; +} + +} // namespace iceberg diff --git a/src/iceberg/update/update_snapshot_reference.h b/src/iceberg/update/update_snapshot_reference.h new file mode 100644 index 000000000..84474c045 --- /dev/null +++ b/src/iceberg/update/update_snapshot_reference.h @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +/// \file iceberg/update/update_snapshot_reference.h + +namespace iceberg { + +/// \brief Updates snapshot references. +/// +/// TODO(xxx): Add SetSnapshotOperation operations such as setCurrentSnapshot, +/// rollBackTime, rollbackTo to this class so that we can support those operations for +/// refs. +class ICEBERG_EXPORT UpdateSnapshotReference : public PendingUpdate { + public: + static Result> Make( + std::shared_ptr transaction); + + ~UpdateSnapshotReference() override; + + /// \brief Create a branch reference. + /// + /// \param name The branch name + /// \param snapshot_id The snapshot ID for the branch + /// \return Reference to this for method chaining + UpdateSnapshotReference& CreateBranch(const std::string& name, int64_t snapshot_id); + + /// \brief Create a tag reference. + /// + /// \param name The tag name + /// \param snapshot_id The snapshot ID for the tag + /// \return Reference to this for method chaining + UpdateSnapshotReference& CreateTag(const std::string& name, int64_t snapshot_id); + + /// \brief Remove a branch reference. + /// + /// \param name The branch name to remove + /// \return Reference to this for method chaining + UpdateSnapshotReference& RemoveBranch(const std::string& name); + + /// \brief Remove a tag reference. + /// + /// \param name The tag name to remove + /// \return Reference to this for method chaining + UpdateSnapshotReference& RemoveTag(const std::string& name); + + /// \brief Rename a branch reference. + /// + /// \param name The current branch name + /// \param new_name The new branch name + /// \return Reference to this for method chaining + UpdateSnapshotReference& RenameBranch(const std::string& name, + const std::string& new_name); + + /// \brief Replace a branch reference with a new snapshot ID. + /// + /// \param name The branch name + /// \param snapshot_id The new snapshot ID + /// \return Reference to this for method chaining + UpdateSnapshotReference& ReplaceBranch(const std::string& name, int64_t snapshot_id); + + /// \brief Replace a branch reference with another reference's snapshot ID. + /// + /// \param from The branch name to update + /// \param to The reference name to copy the snapshot ID from + /// \return Reference to this for method chaining + UpdateSnapshotReference& ReplaceBranch(const std::string& from, const std::string& to); + + /// \brief Fast-forward a branch to another reference's snapshot ID. + /// + /// This is similar to ReplaceBranch but validates that the target snapshot is an + /// ancestor of the current branch snapshot. + /// + /// \param from The branch name to update + /// \param to The reference name to copy the snapshot ID from + /// \return Reference to this for method chaining + UpdateSnapshotReference& FastForward(const std::string& from, const std::string& to); + + /// \brief Replace a tag reference with a new snapshot ID. + /// + /// \param name The tag name + /// \param snapshot_id The new snapshot ID + /// \return Reference to this for method chaining + UpdateSnapshotReference& ReplaceTag(const std::string& name, int64_t snapshot_id); + + /// \brief Set the minimum number of snapshots to keep for a branch. + /// + /// \param name The branch name + /// \param min_snapshots_to_keep The minimum number of snapshots to keep + /// \return Reference to this for method chaining + UpdateSnapshotReference& SetMinSnapshotsToKeep(const std::string& name, + int32_t min_snapshots_to_keep); + + /// \brief Set the maximum snapshot age in milliseconds for a branch. + /// + /// \param name The branch name + /// \param max_snapshot_age_ms The maximum snapshot age in milliseconds + /// \return Reference to this for method chaining + UpdateSnapshotReference& SetMaxSnapshotAgeMs(const std::string& name, + int64_t max_snapshot_age_ms); + + /// \brief Set the maximum reference age in milliseconds. + /// + /// \param name The reference name + /// \param max_ref_age_ms The maximum reference age in milliseconds + /// \return Reference to this for method chaining + UpdateSnapshotReference& SetMaxRefAgeMs(const std::string& name, + int64_t max_ref_age_ms); + + Kind kind() const final { return Kind::kUpdateSnapshotReference; } + + /// \brief Apply the pending changes and return the updated references. + Result>> Apply(); + + private: + explicit UpdateSnapshotReference(std::shared_ptr transaction); + + UpdateSnapshotReference& ReplaceBranchInternal(const std::string& from, + const std::string& to, + bool fast_forward); + + std::unordered_map> updated_refs_; +}; + +} // namespace iceberg diff --git a/src/iceberg/util/snapshot_util.cc b/src/iceberg/util/snapshot_util.cc index c3b93be8b..b8b85787d 100644 --- a/src/iceberg/util/snapshot_util.cc +++ b/src/iceberg/util/snapshot_util.cc @@ -62,6 +62,17 @@ Result SnapshotUtil::IsAncestorOf(const Table& table, int64_t snapshot_id, }); } +Result SnapshotUtil::IsAncestorOf( + int64_t snapshot_id, int64_t ancestor_snapshot_id, + const std::function>(int64_t)>& lookup) { + ICEBERG_ASSIGN_OR_RAISE(auto snapshot, lookup(snapshot_id)); + ICEBERG_CHECK(snapshot != nullptr, "Cannot find snapshot: {}", snapshot_id); + ICEBERG_ASSIGN_OR_RAISE(auto ancestors, AncestorsOf(snapshot, lookup)); + return std::ranges::any_of(ancestors, [ancestor_snapshot_id](const auto& ancestor) { + return ancestor != nullptr && ancestor->snapshot_id == ancestor_snapshot_id; + }); +} + Result SnapshotUtil::IsAncestorOf(const Table& table, int64_t ancestor_snapshot_id) { ICEBERG_ASSIGN_OR_RAISE(auto current, table.current_snapshot()); diff --git a/src/iceberg/util/snapshot_util_internal.h b/src/iceberg/util/snapshot_util_internal.h index ca106cb33..e97ccd222 100644 --- a/src/iceberg/util/snapshot_util_internal.h +++ b/src/iceberg/util/snapshot_util_internal.h @@ -62,6 +62,17 @@ class ICEBERG_EXPORT SnapshotUtil { static Result IsAncestorOf(const Table& table, int64_t snapshot_id, int64_t ancestor_snapshot_id); + /// \brief Returns whether ancestor_snapshot_id is an ancestor of snapshot_id using the + /// given lookup function. + /// + /// \param snapshot_id The snapshot ID to check + /// \param ancestor_snapshot_id The ancestor snapshot ID to check for + /// \param lookup Function to lookup snapshots by ID + /// \return true if ancestor_snapshot_id is an ancestor of snapshot_id + static Result IsAncestorOf( + int64_t snapshot_id, int64_t ancestor_snapshot_id, + const std::function>(int64_t)>& lookup); + /// \brief Returns whether ancestor_snapshot_id is an ancestor of the table's current /// state. ///