diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 86a0efd7e..07aa63ef2 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -88,6 +88,7 @@ set(ICEBERG_SOURCES update/update_properties.cc update/update_schema.cc update/update_sort_order.cc + update/update_statistics.cc util/bucket_util.cc util/content_file_util.cc util/conversions.cc diff --git a/src/iceberg/json_internal.cc b/src/iceberg/json_internal.cc index 7e2652d6c..c54d72b03 100644 --- a/src/iceberg/json_internal.cc +++ b/src/iceberg/json_internal.cc @@ -191,6 +191,8 @@ constexpr std::string_view kActionSetSnapshotRef = "set-snapshot-ref"; constexpr std::string_view kActionSetProperties = "set-properties"; constexpr std::string_view kActionRemoveProperties = "remove-properties"; constexpr std::string_view kActionSetLocation = "set-location"; +constexpr std::string_view kActionSetStatistics = "set-statistics"; +constexpr std::string_view kActionRemoveStatistics = "remove-statistics"; // TableUpdate field constants constexpr std::string_view kUUID = "uuid"; @@ -1399,6 +1401,18 @@ nlohmann::json ToJson(const TableUpdate& update) { json[kLocation] = u.location(); break; } + case TableUpdate::Kind::kSetStatistics: { + const auto& u = internal::checked_cast(update); + json[kAction] = kActionSetStatistics; + json[kStatistics] = ToJson(*u.statistics_file()); + break; + } + case TableUpdate::Kind::kRemoveStatistics: { + const auto& u = internal::checked_cast(update); + json[kAction] = kActionRemoveStatistics; + json[kSnapshotId] = u.snapshot_id(); + break; + } } return json; } diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 87f508cd5..8d46fe91f 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -109,6 +109,7 @@ iceberg_sources = files( 'update/update_properties.cc', 'update/update_schema.cc', 'update/update_sort_order.cc', + 'update/update_statistics.cc', 'util/bucket_util.cc', 'util/content_file_util.cc', 'util/conversions.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index c79ac53fb..f24ef972c 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -35,6 +35,7 @@ #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" #include "iceberg/update/update_schema.h" +#include "iceberg/update/update_statistics.h" #include "iceberg/util/macros.h" namespace iceberg { @@ -192,6 +193,13 @@ Result> Table::NewExpireSnapshots() { return transaction->NewExpireSnapshots(); } +Result> Table::NewUpdateStatistics() { + ICEBERG_ASSIGN_OR_RAISE( + auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, + /*auto_commit=*/true)); + return transaction->NewUpdateStatistics(); +} + Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table.h b/src/iceberg/table.h index cc9482486..5571b1cd7 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -151,6 +151,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// changes. virtual Result> NewExpireSnapshots(); + /// \brief Create a new UpdateStatistics to update the table statistics and commit the + /// changes. + virtual Result> NewUpdateStatistics(); + protected: Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table_metadata.cc b/src/iceberg/table_metadata.cc index 7e357bbae..27ae28c80 100644 --- a/src/iceberg/table_metadata.cc +++ b/src/iceberg/table_metadata.cc @@ -45,6 +45,7 @@ #include "iceberg/schema.h" #include "iceberg/snapshot.h" #include "iceberg/sort_order.h" +#include "iceberg/statistics_file.h" #include "iceberg/table_properties.h" #include "iceberg/table_update.h" #include "iceberg/util/checked_cast.h" @@ -612,7 +613,7 @@ class TableMetadataBuilder::Impl { Status SetCurrentSchema(int32_t schema_id); Status RemoveSchemas(const std::unordered_set& schema_ids); Result AddSchema(const Schema& schema, int32_t new_last_column_id); - void SetLocation(std::string_view location); + Status SetLocation(std::string_view location); Status AddSnapshot(std::shared_ptr snapshot); Status SetBranchSnapshot(int64_t snapshot_id, const std::string& branch); Status SetBranchSnapshot(std::shared_ptr snapshot, const std::string& branch); @@ -620,6 +621,8 @@ class TableMetadataBuilder::Impl { Status RemoveRef(const std::string& name); Status RemoveSnapshots(const std::vector& snapshot_ids); Status RemovePartitionSpecs(const std::vector& spec_ids); + Status SetStatistics(const std::shared_ptr& statistics_file); + Status RemoveStatistics(int64_t snapshot_id); Result> Build(); @@ -1032,12 +1035,13 @@ Result TableMetadataBuilder::Impl::AddSchema(const Schema& schema, return new_schema_id; } -void TableMetadataBuilder::Impl::SetLocation(std::string_view location) { +Status TableMetadataBuilder::Impl::SetLocation(std::string_view location) { if (location == metadata_.location) { - return; + return {}; } metadata_.location = std::string(location); changes_.push_back(std::make_unique(std::string(location))); + return {}; } Status TableMetadataBuilder::Impl::AddSnapshot(std::shared_ptr snapshot) { @@ -1173,6 +1177,45 @@ Status TableMetadataBuilder::Impl::SetRef(const std::string& name, return {}; } +Status TableMetadataBuilder::Impl::SetStatistics( + const std::shared_ptr& statistics_file) { + ICEBERG_CHECK(statistics_file != nullptr, "Cannot set null statistics file"); + + // Find and replace existing statistics for the same snapshot_id, or add new one + auto it = std::ranges::find_if( + metadata_.statistics, + [snapshot_id = statistics_file->snapshot_id](const auto& stat) { + return stat && stat->snapshot_id == snapshot_id; + }); + + if (it != metadata_.statistics.end()) { + *it = statistics_file; + } else { + metadata_.statistics.push_back(statistics_file); + } + + changes_.push_back(std::make_unique(statistics_file)); + return {}; +} + +Status TableMetadataBuilder::Impl::RemoveStatistics(int64_t snapshot_id) { + auto it = std::ranges::find_if(metadata_.statistics, [snapshot_id](const auto& stat) { + return stat && stat->snapshot_id == snapshot_id; + }); + + if (it == metadata_.statistics.end()) { + return {}; + } + + // Remove statistics for the given snapshot_id + std::erase_if(metadata_.statistics, [snapshot_id](const auto& stat) { + return stat && stat->snapshot_id == snapshot_id; + }); + + changes_.push_back(std::make_unique(snapshot_id)); + return {}; +} + std::unordered_set TableMetadataBuilder::Impl::IntermediateSnapshotIdSet( int64_t current_snapshot_id) const { std::unordered_set added_snapshot_ids; @@ -1590,11 +1633,13 @@ TableMetadataBuilder& TableMetadataBuilder::SuppressHistoricalSnapshots() { TableMetadataBuilder& TableMetadataBuilder::SetStatistics( const std::shared_ptr& statistics_file) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetStatistics(statistics_file)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::RemoveStatistics(int64_t snapshot_id) { - throw IcebergError(std::format("{} not implemented", __FUNCTION__)); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->RemoveStatistics(snapshot_id)); + return *this; } TableMetadataBuilder& TableMetadataBuilder::SetPartitionStatistics( @@ -1620,7 +1665,7 @@ TableMetadataBuilder& TableMetadataBuilder::RemoveProperties( } TableMetadataBuilder& TableMetadataBuilder::SetLocation(std::string_view location) { - impl_->SetLocation(location); + ICEBERG_BUILDER_RETURN_IF_ERROR(impl_->SetLocation(location)); return *this; } diff --git a/src/iceberg/table_update.cc b/src/iceberg/table_update.cc index 7a01bdee7..5f07f24c1 100644 --- a/src/iceberg/table_update.cc +++ b/src/iceberg/table_update.cc @@ -22,6 +22,7 @@ #include "iceberg/exception.h" #include "iceberg/schema.h" #include "iceberg/sort_order.h" +#include "iceberg/statistics_file.h" #include "iceberg/table_metadata.h" #include "iceberg/table_requirements.h" @@ -446,4 +447,50 @@ std::unique_ptr SetLocation::Clone() const { return std::make_unique(location_); } +// SetStatistics + +int64_t SetStatistics::snapshot_id() const { return statistics_file_->snapshot_id; } + +void SetStatistics::ApplyTo(TableMetadataBuilder& builder) const { + builder.SetStatistics(statistics_file_); +} + +void SetStatistics::GenerateRequirements(TableUpdateContext& context) const { + // SetStatistics doesn't generate any requirements +} + +bool SetStatistics::Equals(const TableUpdate& other) const { + if (other.kind() != Kind::kSetStatistics) { + return false; + } + const auto& other_set = static_cast(other); + return *statistics_file_ == *other_set.statistics_file_; +} + +std::unique_ptr SetStatistics::Clone() const { + return std::make_unique(statistics_file_); +} + +// RemoveStatistics + +void RemoveStatistics::ApplyTo(TableMetadataBuilder& builder) const { + builder.RemoveStatistics(snapshot_id_); +} + +void RemoveStatistics::GenerateRequirements(TableUpdateContext& context) const { + // RemoveStatistics doesn't generate any requirements +} + +bool RemoveStatistics::Equals(const TableUpdate& other) const { + if (other.kind() != Kind::kRemoveStatistics) { + return false; + } + const auto& other_remove = static_cast(other); + return snapshot_id_ == other_remove.snapshot_id_; +} + +std::unique_ptr RemoveStatistics::Clone() const { + return std::make_unique(snapshot_id_); +} + } // namespace iceberg::table diff --git a/src/iceberg/table_update.h b/src/iceberg/table_update.h index 3c9c9dbbc..5bbc243ef 100644 --- a/src/iceberg/table_update.h +++ b/src/iceberg/table_update.h @@ -59,6 +59,8 @@ class ICEBERG_EXPORT TableUpdate { kSetProperties, kRemoveProperties, kSetLocation, + kSetStatistics, + kRemoveStatistics, }; virtual ~TableUpdate(); @@ -509,6 +511,53 @@ class ICEBERG_EXPORT SetLocation : public TableUpdate { std::string location_; }; +/// \brief Represents setting statistics for a snapshot +class ICEBERG_EXPORT SetStatistics : public TableUpdate { + public: + explicit SetStatistics(std::shared_ptr statistics_file) + : statistics_file_(std::move(statistics_file)) {} + + int64_t snapshot_id() const; + + const std::shared_ptr& statistics_file() const { + return statistics_file_; + } + + void ApplyTo(TableMetadataBuilder& builder) const override; + + void GenerateRequirements(TableUpdateContext& context) const override; + + Kind kind() const override { return Kind::kSetStatistics; } + + bool Equals(const TableUpdate& other) const override; + + std::unique_ptr Clone() const override; + + private: + std::shared_ptr statistics_file_; +}; + +/// \brief Represents removing statistics for a snapshot +class ICEBERG_EXPORT RemoveStatistics : public TableUpdate { + public: + explicit RemoveStatistics(int64_t snapshot_id) : snapshot_id_(snapshot_id) {} + + int64_t snapshot_id() const { return snapshot_id_; } + + void ApplyTo(TableMetadataBuilder& builder) const override; + + void GenerateRequirements(TableUpdateContext& context) const override; + + Kind kind() const override { return Kind::kRemoveStatistics; } + + bool Equals(const TableUpdate& other) const override; + + std::unique_ptr Clone() const override; + + private: + int64_t snapshot_id_; +}; + } // namespace table } // namespace iceberg diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 1f6ab5521..239486fe0 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -175,7 +175,8 @@ if(ICEBERG_BUILD_BUNDLE) update_partition_spec_test.cc update_properties_test.cc update_schema_test.cc - update_sort_order_test.cc) + update_sort_order_test.cc + update_statistics_test.cc) add_iceberg_test(data_writer_test USE_BUNDLE SOURCES data_writer_test.cc) diff --git a/src/iceberg/test/update_statistics_test.cc b/src/iceberg/test/update_statistics_test.cc new file mode 100644 index 000000000..520c53196 --- /dev/null +++ b/src/iceberg/test/update_statistics_test.cc @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_statistics.h" + +#include + +#include +#include + +#include "iceberg/result.h" +#include "iceberg/statistics_file.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" + +namespace iceberg { + +class UpdateStatisticsTest : public UpdateTestBase { + protected: + // Helper function to create a statistics file + std::shared_ptr MakeStatisticsFile(int64_t snapshot_id, + const std::string& path, + int64_t file_size = 1024, + int64_t footer_size = 128) { + auto stats_file = std::make_shared(); + stats_file->snapshot_id = snapshot_id; + stats_file->path = path; + stats_file->file_size_in_bytes = file_size; + stats_file->file_footer_size_in_bytes = footer_size; + + BlobMetadata blob; + blob.type = "apache-datasketches-theta-v1"; + blob.source_snapshot_id = snapshot_id; + blob.source_snapshot_sequence_number = 1; + blob.fields = {1, 2}; + blob.properties = {{"ndv", "100"}}; + stats_file->blob_metadata.push_back(blob); + + return stats_file; + } +}; + +TEST_F(UpdateStatisticsTest, EmptyUpdate) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_TRUE(result.to_set.empty()); + EXPECT_TRUE(result.to_remove.empty()); +} + +TEST_F(UpdateStatisticsTest, SetStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + auto stats_file = + MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin"); + update->SetStatistics(stats_file); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.to_set.size(), 1); + EXPECT_TRUE(result.to_remove.empty()); + EXPECT_EQ(result.to_set.at(1), stats_file); +} + +TEST_F(UpdateStatisticsTest, SetMultipleStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + auto stats_file_1 = + MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin"); + auto stats_file_2 = + MakeStatisticsFile(2, "/warehouse/test_table/metadata/stats-2.puffin"); + + update->SetStatistics(stats_file_1).SetStatistics(stats_file_2); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.to_set.size(), 2); + EXPECT_TRUE(result.to_remove.empty()); + EXPECT_EQ(result.to_set.at(1), stats_file_1); + EXPECT_EQ(result.to_set.at(2), stats_file_2); +} + +TEST_F(UpdateStatisticsTest, RemoveStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + update->RemoveStatistics(1); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_TRUE(result.to_set.empty()); + EXPECT_EQ(result.to_remove.size(), 1); + EXPECT_THAT(result.to_remove, ::testing::Contains(1)); +} + +TEST_F(UpdateStatisticsTest, RemoveMultipleStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + update->RemoveStatistics(1).RemoveStatistics(2).RemoveStatistics(3); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_TRUE(result.to_set.empty()); + EXPECT_EQ(result.to_remove.size(), 3); + EXPECT_THAT(result.to_remove, ::testing::UnorderedElementsAre(1, 2, 3)); +} + +TEST_F(UpdateStatisticsTest, SetAndRemoveDifferentSnapshots) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + auto stats_file = + MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin"); + + update->SetStatistics(stats_file).RemoveStatistics(2); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.to_set.size(), 1); + EXPECT_EQ(result.to_set.at(1), stats_file); + EXPECT_EQ(result.to_remove.size(), 1); + EXPECT_THAT(result.to_remove, ::testing::Contains(2)); +} + +TEST_F(UpdateStatisticsTest, ReplaceStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + auto stats_file_1 = + MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1a.puffin"); + auto stats_file_2 = + MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1b.puffin", 2048, 256); + + // Set statistics for snapshot 1, then replace it + update->SetStatistics(stats_file_1).SetStatistics(stats_file_2); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.to_set.size(), 1); + EXPECT_TRUE(result.to_remove.empty()); + // Should have the second one (replacement) + EXPECT_EQ(result.to_set.at(1), stats_file_2); + EXPECT_NE(result.to_set.at(1), stats_file_1); +} + +TEST_F(UpdateStatisticsTest, SetThenRemoveSameSnapshot) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + auto stats_file = + MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin"); + + // Set statistics for snapshot 1, then remove it + update->SetStatistics(stats_file).RemoveStatistics(1); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_TRUE(result.to_set.empty()); + EXPECT_EQ(result.to_remove.size(), 1); + EXPECT_THAT(result.to_remove, ::testing::Contains(1)); +} + +TEST_F(UpdateStatisticsTest, RemoveThenSetSameSnapshot) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + auto stats_file = + MakeStatisticsFile(1, "/warehouse/test_table/metadata/stats-1.puffin"); + + // Remove statistics for snapshot 1, then set new ones + update->RemoveStatistics(1).SetStatistics(stats_file); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result.to_set.size(), 1); + EXPECT_TRUE(result.to_remove.empty()); + EXPECT_EQ(result.to_set.at(1), stats_file); +} + +TEST_F(UpdateStatisticsTest, SetNullStatistics) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateStatistics()); + + update->SetStatistics(nullptr); + + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Statistics file cannot be null")); +} + +TEST_F(UpdateStatisticsTest, CommitSuccess) { + // Test empty commit + ICEBERG_UNWRAP_OR_FAIL(auto empty_update, table_->NewUpdateStatistics()); + EXPECT_THAT(empty_update->Commit(), IsOk()); + + // Reload table after first commit + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + + // Get a valid snapshot ID from the table + ICEBERG_UNWRAP_OR_FAIL(auto current_snapshot, reloaded->current_snapshot()); + int64_t snapshot_id = current_snapshot->snapshot_id; + + // Test commit with statistics changes + ICEBERG_UNWRAP_OR_FAIL(auto update, reloaded->NewUpdateStatistics()); + auto stats_file = + MakeStatisticsFile(snapshot_id, "/warehouse/test_table/metadata/stats-1.puffin"); + update->SetStatistics(stats_file); + + EXPECT_THAT(update->Commit(), IsOk()); + + // Verify the statistics were committed and persisted + ICEBERG_UNWRAP_OR_FAIL(auto final_table, catalog_->LoadTable(table_ident_)); + const auto& statistics = final_table->metadata()->statistics; + EXPECT_EQ(statistics.size(), 1); + EXPECT_EQ(*statistics[0], *stats_file); +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index f763c5670..d9cefdb50 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -24,6 +24,7 @@ #include "iceberg/catalog.h" #include "iceberg/schema.h" +#include "iceberg/statistics_file.h" #include "iceberg/table.h" #include "iceberg/table_metadata.h" #include "iceberg/table_properties.h" @@ -37,6 +38,7 @@ #include "iceberg/update/update_properties.h" #include "iceberg/update/update_schema.h" #include "iceberg/update/update_sort_order.h" +#include "iceberg/update/update_statistics.h" #include "iceberg/util/checked_cast.h" #include "iceberg/util/location_util.h" #include "iceberg/util/macros.h" @@ -183,6 +185,17 @@ Status Transaction::Apply(PendingUpdate& update) { metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove)); } } break; + case PendingUpdate::Kind::kUpdateStatistics: { + auto& update_statistics = internal::checked_cast(update); + ICEBERG_ASSIGN_OR_RAISE(auto result, update_statistics.Apply()); + // Apply statistics changes to the metadata builder + for (const auto& [snapshot_id, stat_file] : result.to_set) { + metadata_builder_->SetStatistics(stat_file); + } + for (const auto& snapshot_id : result.to_remove) { + metadata_builder_->RemoveStatistics(snapshot_id); + } + } break; default: return NotSupported("Unsupported pending update: {}", static_cast(update.kind())); @@ -280,4 +293,11 @@ Result> Transaction::NewExpireSnapshots() { return expire_snapshots; } +Result> Transaction::NewUpdateStatistics() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_statistics, + UpdateStatistics::Make(shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_statistics)); + return update_statistics; +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 057a27a90..d6ee887f2 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -82,6 +82,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewExpireSnapshots(); + /// \brief Create a new UpdateStatistics to update table statistics and commit the + /// changes. + Result> NewUpdateStatistics(); + private: Transaction(std::shared_ptr
table, Kind kind, bool auto_commit, std::unique_ptr metadata_builder); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index c8854031d..e3283d496 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -187,13 +187,14 @@ class TableUpdateContext; class Transaction; /// \brief Update family. +class ExpireSnapshots; class PendingUpdate; class SnapshotUpdate; class UpdatePartitionSpec; class UpdateProperties; class UpdateSchema; class UpdateSortOrder; -class ExpireSnapshots; +class UpdateStatistics; /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 4238e0222..d33afa6a8 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -23,6 +23,7 @@ install_headers( 'update_schema.h', 'update_sort_order.h', 'update_properties.h', + 'update_statistics.h', ], subdir: 'iceberg/update', ) diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index 8a8329eee..2378dbf8f 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -48,6 +48,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { kUpdateSchema, kUpdateSnapshot, kUpdateSortOrder, + kUpdateStatistics, }; /// \brief Return the kind of this pending update. diff --git a/src/iceberg/update/update_statistics.cc b/src/iceberg/update/update_statistics.cc new file mode 100644 index 000000000..f0f3dfbd5 --- /dev/null +++ b/src/iceberg/update/update_statistics.cc @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_statistics.h" + +#include +#include +#include +#include + +#include "iceberg/result.h" +#include "iceberg/statistics_file.h" +#include "iceberg/transaction.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> UpdateStatistics::Make( + std::shared_ptr transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create UpdateStatistics without a transaction"); + return std::shared_ptr(new UpdateStatistics(std::move(transaction))); +} + +UpdateStatistics::UpdateStatistics(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)) {} + +UpdateStatistics::~UpdateStatistics() = default; + +UpdateStatistics& UpdateStatistics::SetStatistics( + const std::shared_ptr& statistics_file) { + ICEBERG_BUILDER_CHECK(statistics_file != nullptr, "Statistics file cannot be null"); + statistics_to_set_[statistics_file->snapshot_id] = statistics_file; + return *this; +} + +UpdateStatistics& UpdateStatistics::RemoveStatistics(int64_t snapshot_id) { + statistics_to_set_[snapshot_id] = nullptr; + return *this; +} + +Result UpdateStatistics::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + + ApplyResult result; + for (const auto& [snapshot_id, stats] : statistics_to_set_) { + if (stats) { + result.to_set[snapshot_id] = stats; + } else { + result.to_remove.push_back(snapshot_id); + } + } + return result; +} + +} // namespace iceberg diff --git a/src/iceberg/update/update_statistics.h b/src/iceberg/update/update_statistics.h new file mode 100644 index 000000000..49ee2b091 --- /dev/null +++ b/src/iceberg/update/update_statistics.h @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +/// \file iceberg/update/update_statistics.h +/// \brief Updates table statistics. + +namespace iceberg { + +/// \brief Updates table statistics. +class ICEBERG_EXPORT UpdateStatistics : public PendingUpdate { + public: + static Result> Make( + std::shared_ptr transaction); + + ~UpdateStatistics() override; + + /// \brief Set statistics file for a snapshot. + /// + /// Associates a statistics file with a snapshot ID. If statistics already exist + /// for this snapshot, they will be replaced. + /// + /// \param statistics_file The statistics file to set + /// \return Reference to this UpdateStatistics for chaining + UpdateStatistics& SetStatistics(const std::shared_ptr& statistics_file); + + /// \brief Remove statistics for a snapshot. + /// + /// Marks the statistics for the given snapshot ID for removal. + /// + /// \param snapshot_id The snapshot ID whose statistics to remove + /// \return Reference to this UpdateStatistics for chaining + UpdateStatistics& RemoveStatistics(int64_t snapshot_id); + + Kind kind() const final { return Kind::kUpdateStatistics; } + + struct ApplyResult { + std::unordered_map> to_set; + std::vector to_remove; + }; + + Result Apply(); + + private: + explicit UpdateStatistics(std::shared_ptr transaction); + + std::unordered_map> statistics_to_set_; +}; + +} // namespace iceberg