Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/iceberg/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ set(ICEBERG_SOURCES
update/update_properties.cc
update/update_schema.cc
update/update_sort_order.cc
update/update_snapshot_reference.cc
util/bucket_util.cc
util/content_file_util.cc
util/conversions.cc
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ iceberg_sources = files(
'update/update_partition_spec.cc',
'update/update_properties.cc',
'update/update_schema.cc',
'update/update_snapshot_reference.cc',
'update/update_sort_order.cc',
'util/bucket_util.cc',
'util/content_file_util.cc',
Expand Down
2 changes: 1 addition & 1 deletion src/iceberg/parquet/parquet_data_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ Result<std::shared_ptr<::arrow::Array>> ProjectStructArray(
return output_array;
}

/// Templated implementation for projecting list arrays.
/// \brief Templated implementation for projecting list arrays.
/// Works with both ListArray/ListType (32-bit offsets) and
/// LargeListArray/LargeListType (64-bit offsets).
template <typename ArrowListArrayType, typename ArrowListType>
Expand Down
27 changes: 27 additions & 0 deletions src/iceberg/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "iceberg/update/update_partition_spec.h"
#include "iceberg/update/update_properties.h"
#include "iceberg/update/update_schema.h"
#include "iceberg/update/update_snapshot_reference.h"
#include "iceberg/update/update_sort_order.h"
#include "iceberg/util/checked_cast.h"
#include "iceberg/util/location_util.h"
Expand Down Expand Up @@ -183,6 +184,24 @@ Status Transaction::Apply(PendingUpdate& update) {
metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove));
}
} break;
case PendingUpdate::Kind::kUpdateSnapshotReference: {
auto& update_ref = internal::checked_cast<UpdateSnapshotReference&>(update);
ICEBERG_ASSIGN_OR_RAISE(auto updated_refs, update_ref.Apply());
const auto& current_refs = current().refs;
// Identify references which have been removed
for (const auto& [name, ref] : current_refs) {
if (updated_refs.find(name) == updated_refs.end()) {
metadata_builder_->RemoveRef(name);
}
}
// Identify references which have been created or updated
for (const auto& [name, ref] : updated_refs) {
auto current_it = current_refs.find(name);
if (current_it == current_refs.end() || *current_it->second != *ref) {
metadata_builder_->SetRef(name, ref);
}
}
} break;
default:
return NotSupported("Unsupported pending update: {}",
static_cast<int32_t>(update.kind()));
Expand Down Expand Up @@ -280,4 +299,12 @@ Result<std::shared_ptr<ExpireSnapshots>> Transaction::NewExpireSnapshots() {
return expire_snapshots;
}

Result<std::shared_ptr<UpdateSnapshotReference>>
Transaction::NewUpdateSnapshotReference() {
ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr<UpdateSnapshotReference> update_ref,
UpdateSnapshotReference::Make(shared_from_this()));
ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_ref));
return update_ref;
}

} // namespace iceberg
4 changes: 4 additions & 0 deletions src/iceberg/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this<Transacti
/// changes.
Result<std::shared_ptr<ExpireSnapshots>> NewExpireSnapshots();

/// \brief Create a new UpdateSnapshotReference to update snapshot references (branches
/// and tags) and commit the changes.
Result<std::shared_ptr<UpdateSnapshotReference>> NewUpdateSnapshotReference();

private:
Transaction(std::shared_ptr<Table> table, Kind kind, bool auto_commit,
std::unique_ptr<TableMetadataBuilder> metadata_builder);
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/type_fwd.h
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,7 @@ class UpdateProperties;
class UpdateSchema;
class UpdateSortOrder;
class ExpireSnapshots;
class UpdateSnapshotReference;

/// ----------------------------------------------------------------------------
/// TODO: Forward declarations below are not added yet.
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/update/meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ install_headers(
'snapshot_update.h',
'update_partition_spec.h',
'update_schema.h',
'update_snapshot_reference.h',
'update_sort_order.h',
'update_properties.h',
],
Expand Down
1 change: 1 addition & 0 deletions src/iceberg/update/pending_update.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector {
kUpdateSchema,
kUpdateSnapshot,
kUpdateSortOrder,
kUpdateSnapshotReference,
};

/// \brief Return the kind of this pending update.
Expand Down
228 changes: 228 additions & 0 deletions src/iceberg/update/update_snapshot_reference.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,228 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

#include "iceberg/update/update_snapshot_reference.h"

#include <memory>
#include <optional>
#include <string>
#include <unordered_map>

#include "iceberg/result.h"
#include "iceberg/snapshot.h"
#include "iceberg/table_metadata.h"
#include "iceberg/transaction.h"
#include "iceberg/util/error_collector.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/snapshot_util_internal.h"

namespace iceberg {

Result<std::shared_ptr<UpdateSnapshotReference>> UpdateSnapshotReference::Make(
std::shared_ptr<Transaction> transaction) {
ICEBERG_PRECHECK(transaction != nullptr,
"Cannot create UpdateSnapshotReference without a transaction");
return std::shared_ptr<UpdateSnapshotReference>(
new UpdateSnapshotReference(std::move(transaction)));
}

UpdateSnapshotReference::UpdateSnapshotReference(std::shared_ptr<Transaction> transaction)
: PendingUpdate(std::move(transaction)) {
// Initialize updated_refs_ with current refs from base metadata
for (const auto& [name, ref] : base().refs) {
updated_refs_[name] = ref;
}
}

UpdateSnapshotReference::~UpdateSnapshotReference() = default;

UpdateSnapshotReference& UpdateSnapshotReference::CreateBranch(const std::string& name,
int64_t snapshot_id) {
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto branch, SnapshotRef::MakeBranch(snapshot_id));
auto [_, inserted] = updated_refs_.emplace(name, std::move(branch));
ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", name);
return *this;
}

UpdateSnapshotReference& UpdateSnapshotReference::CreateTag(const std::string& name,
int64_t snapshot_id) {
ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty");
ICEBERG_BUILDER_ASSIGN_OR_RETURN(auto tag, SnapshotRef::MakeTag(snapshot_id));
auto [_, inserted] = updated_refs_.emplace(name, std::move(tag));
ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", name);
return *this;
}

UpdateSnapshotReference& UpdateSnapshotReference::RemoveBranch(const std::string& name) {
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
ICEBERG_BUILDER_CHECK(name != SnapshotRef::kMainBranch, "Cannot remove main branch");
auto it = updated_refs_.find(name);
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
"Ref '{}' is a tag not a branch", name);
updated_refs_.erase(it);
return *this;
}

UpdateSnapshotReference& UpdateSnapshotReference::RemoveTag(const std::string& name) {
ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty");
auto it = updated_refs_.find(name);
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}", name);
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag,
"Ref '{}' is a branch not a tag", name);
updated_refs_.erase(it);
return *this;
}

UpdateSnapshotReference& UpdateSnapshotReference::RenameBranch(
const std::string& name, const std::string& new_name) {
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch to rename cannot be empty");
ICEBERG_BUILDER_CHECK(!new_name.empty(), "New branch name cannot be empty");
ICEBERG_BUILDER_CHECK(name != SnapshotRef::kMainBranch, "Cannot rename main branch");
auto it = updated_refs_.find(name);
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
"Ref '{}' is a tag not a branch", name);
auto [_, inserted] = updated_refs_.emplace(new_name, it->second);
ICEBERG_BUILDER_CHECK(inserted, "Ref '{}' already exists", new_name);
updated_refs_.erase(it);
return *this;
}

UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::string& name,
int64_t snapshot_id) {
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
auto it = updated_refs_.find(name);
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
"Ref '{}' is a tag not a branch", name);
it->second->snapshot_id = snapshot_id;
return *this;
}

UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranch(const std::string& from,
const std::string& to) {
return ReplaceBranchInternal(from, to, false);
}

UpdateSnapshotReference& UpdateSnapshotReference::FastForward(const std::string& from,
const std::string& to) {
return ReplaceBranchInternal(from, to, true);
}

UpdateSnapshotReference& UpdateSnapshotReference::ReplaceBranchInternal(
const std::string& from, const std::string& to, bool fast_forward) {
ICEBERG_BUILDER_CHECK(!from.empty(), "Branch to update cannot be empty");
ICEBERG_BUILDER_CHECK(!to.empty(), "Destination ref cannot be empty");
auto to_it = updated_refs_.find(to);
ICEBERG_BUILDER_CHECK(to_it != updated_refs_.end(), "Ref does not exist: {}", to);

auto from_it = updated_refs_.find(from);
if (from_it == updated_refs_.end()) {
return CreateBranch(from, to_it->second->snapshot_id);
}

ICEBERG_BUILDER_CHECK(from_it->second->type() == SnapshotRefType::kBranch,
"Ref '{}' is a tag not a branch", from);

// Nothing to replace if snapshot IDs are the same
if (to_it->second->snapshot_id == from_it->second->snapshot_id) {
return *this;
}

if (fast_forward) {
const auto& base_metadata = transaction_->current();
ICEBERG_BUILDER_ASSIGN_OR_RETURN(
auto target_is_ancestor,
SnapshotUtil::IsAncestorOf(
from_it->second->snapshot_id, to_it->second->snapshot_id,
[&base_metadata](int64_t id) { return base_metadata.SnapshotById(id); }));

ICEBERG_BUILDER_CHECK(target_is_ancestor,
"Cannot fast-forward: {} is not an ancestor of {}", from, to);
}

from_it->second->snapshot_id = to_it->second->snapshot_id;
return *this;
}

UpdateSnapshotReference& UpdateSnapshotReference::ReplaceTag(const std::string& name,
int64_t snapshot_id) {
ICEBERG_BUILDER_CHECK(!name.empty(), "Tag name cannot be empty");
auto it = updated_refs_.find(name);
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Tag does not exist: {}", name);
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kTag,
"Ref '{}' is a branch not a tag", name);
it->second->snapshot_id = snapshot_id;
return *this;
}

UpdateSnapshotReference& UpdateSnapshotReference::SetMinSnapshotsToKeep(
const std::string& name, int32_t min_snapshots_to_keep) {
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
auto it = updated_refs_.find(name);
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
"Ref '{}' is a tag not a branch", name);
std::get<SnapshotRef::Branch>(it->second->retention).min_snapshots_to_keep =
min_snapshots_to_keep;
ICEBERG_BUILDER_CHECK(it->second->Validate(),
"Invalid min_snapshots_to_keep {} for branch '{}'",
min_snapshots_to_keep, name);
return *this;
}

UpdateSnapshotReference& UpdateSnapshotReference::SetMaxSnapshotAgeMs(
const std::string& name, int64_t max_snapshot_age_ms) {
ICEBERG_BUILDER_CHECK(!name.empty(), "Branch name cannot be empty");
auto it = updated_refs_.find(name);
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Branch does not exist: {}", name);
ICEBERG_BUILDER_CHECK(it->second->type() == SnapshotRefType::kBranch,
"Ref '{}' is a tag not a branch", name);
std::get<SnapshotRef::Branch>(it->second->retention).max_snapshot_age_ms =
max_snapshot_age_ms;
ICEBERG_BUILDER_CHECK(it->second->Validate(),
"Invalid max_snapshot_age_ms {} for branch '{}'",
max_snapshot_age_ms, name);
return *this;
}

UpdateSnapshotReference& UpdateSnapshotReference::SetMaxRefAgeMs(const std::string& name,
int64_t max_ref_age_ms) {
ICEBERG_BUILDER_CHECK(!name.empty(), "Reference name cannot be empty");
auto it = updated_refs_.find(name);
ICEBERG_BUILDER_CHECK(it != updated_refs_.end(), "Ref does not exist: {}", name);
if (it->second->type() == SnapshotRefType::kBranch) {
std::get<SnapshotRef::Branch>(it->second->retention).max_ref_age_ms = max_ref_age_ms;
} else {
std::get<SnapshotRef::Tag>(it->second->retention).max_ref_age_ms = max_ref_age_ms;
}
ICEBERG_BUILDER_CHECK(it->second->Validate(), "Invalid max_ref_age_ms {} for ref '{}'",
max_ref_age_ms, name);
return *this;
}

Result<std::unordered_map<std::string, std::shared_ptr<SnapshotRef>>>
UpdateSnapshotReference::Apply() {
ICEBERG_RETURN_UNEXPECTED(CheckErrors());
return updated_refs_;
}

} // namespace iceberg
Loading
Loading