diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 86a0efd7e..103168b25 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -84,6 +84,7 @@ set(ICEBERG_SOURCES update/expire_snapshots.cc update/pending_update.cc update/snapshot_update.cc + update/update_location.cc update/update_partition_spec.cc update/update_properties.cc update/update_schema.cc diff --git a/src/iceberg/meson.build b/src/iceberg/meson.build index 87f508cd5..317b4fa9e 100644 --- a/src/iceberg/meson.build +++ b/src/iceberg/meson.build @@ -105,6 +105,7 @@ iceberg_sources = files( 'update/expire_snapshots.cc', 'update/pending_update.cc', 'update/snapshot_update.cc', + 'update/update_location.cc', 'update/update_partition_spec.cc', 'update/update_properties.cc', 'update/update_schema.cc', diff --git a/src/iceberg/table.cc b/src/iceberg/table.cc index c79ac53fb..5c406debc 100644 --- a/src/iceberg/table.cc +++ b/src/iceberg/table.cc @@ -192,6 +192,13 @@ Result> Table::NewExpireSnapshots() { return transaction->NewExpireSnapshots(); } +Result> Table::NewUpdateLocation() { + ICEBERG_ASSIGN_OR_RAISE( + auto transaction, Transaction::Make(shared_from_this(), Transaction::Kind::kUpdate, + /*auto_commit=*/true)); + return transaction->NewUpdateLocation(); +} + Result> StagedTable::Make( TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/table.h b/src/iceberg/table.h index cc9482486..fd346e15a 100644 --- a/src/iceberg/table.h +++ b/src/iceberg/table.h @@ -29,6 +29,7 @@ #include "iceberg/snapshot.h" #include "iceberg/table_identifier.h" #include "iceberg/type_fwd.h" +#include "iceberg/update/update_location.h" #include "iceberg/util/timepoint.h" namespace iceberg { @@ -151,6 +152,10 @@ class ICEBERG_EXPORT Table : public std::enable_shared_from_this { /// changes. virtual Result> NewExpireSnapshots(); + /// \brief Create a new UpdateLocation to update the table location and commit the + /// changes. + virtual Result> NewUpdateLocation(); + protected: Table(TableIdentifier identifier, std::shared_ptr metadata, std::string metadata_location, std::shared_ptr io, diff --git a/src/iceberg/test/CMakeLists.txt b/src/iceberg/test/CMakeLists.txt index 1f6ab5521..364c690e6 100644 --- a/src/iceberg/test/CMakeLists.txt +++ b/src/iceberg/test/CMakeLists.txt @@ -172,6 +172,7 @@ if(ICEBERG_BUILD_BUNDLE) SOURCES expire_snapshots_test.cc transaction_test.cc + update_location_test.cc update_partition_spec_test.cc update_properties_test.cc update_schema_test.cc diff --git a/src/iceberg/test/update_location_test.cc b/src/iceberg/test/update_location_test.cc new file mode 100644 index 000000000..53b347b56 --- /dev/null +++ b/src/iceberg/test/update_location_test.cc @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_location.h" + +#include +#include + +#include +#include +#include + +#include "iceberg/arrow/arrow_fs_file_io_internal.h" +#include "iceberg/result.h" +#include "iceberg/test/matchers.h" +#include "iceberg/test/update_test_base.h" + +namespace iceberg { + +class UpdateLocationTest : public UpdateTestBase {}; + +TEST_F(UpdateLocationTest, SetLocationSuccess) { + const std::string new_location = "/warehouse/new_location"; + + // Create metadata directory for the new location + auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>( + static_cast(*file_io_).fs()); + ASSERT_TRUE(arrow_fs != nullptr); + ASSERT_TRUE(arrow_fs->CreateDir(new_location + "/metadata").ok()); + + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation()); + update->SetLocation(new_location); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result, new_location); + + // Commit and verify the location was persisted + EXPECT_THAT(update->Commit(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + EXPECT_EQ(reloaded->location(), new_location); +} + +TEST_F(UpdateLocationTest, SetLocationMultipleTimes) { + // Test that setting location multiple times uses the last value + const std::string final_location = "/warehouse/final_location"; + + // Create metadata directory for the new location + auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>( + static_cast(*file_io_).fs()); + ASSERT_TRUE(arrow_fs != nullptr); + ASSERT_TRUE(arrow_fs->CreateDir(final_location + "/metadata").ok()); + + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation()); + update->SetLocation("/warehouse/first_location") + .SetLocation("/warehouse/second_location") + .SetLocation(final_location); + + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result, final_location); + + // Commit and verify the final location was persisted + EXPECT_THAT(update->Commit(), IsOk()); + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + EXPECT_EQ(reloaded->location(), final_location); +} + +TEST_F(UpdateLocationTest, SetEmptyLocation) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation()); + update->SetLocation(""); + + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kValidationFailed)); + EXPECT_THAT(result, HasErrorMessage("Location cannot be empty")); +} + +TEST_F(UpdateLocationTest, ApplyWithoutSettingLocation) { + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation()); + + auto result = update->Apply(); + EXPECT_THAT(result, IsError(ErrorKind::kInvalidArgument)); + EXPECT_THAT(result, HasErrorMessage("Location must be set before applying")); +} + +TEST_F(UpdateLocationTest, MultipleUpdatesSequentially) { + // Get arrow_fs for creating directories + auto arrow_fs = std::dynamic_pointer_cast<::arrow::fs::internal::MockFileSystem>( + static_cast(*file_io_).fs()); + ASSERT_TRUE(arrow_fs != nullptr); + + // First update + const std::string first_location = "/warehouse/first"; + ASSERT_TRUE(arrow_fs->CreateDir(first_location + "/metadata").ok()); + + ICEBERG_UNWRAP_OR_FAIL(auto update, table_->NewUpdateLocation()); + update->SetLocation(first_location); + ICEBERG_UNWRAP_OR_FAIL(auto result, update->Apply()); + EXPECT_EQ(result, first_location); + EXPECT_THAT(update->Commit(), IsOk()); + + // Reload and verify + ICEBERG_UNWRAP_OR_FAIL(auto reloaded, catalog_->LoadTable(table_ident_)); + EXPECT_EQ(reloaded->location(), first_location); + + // Second update + const std::string second_location = "/warehouse/second"; + ASSERT_TRUE(arrow_fs->CreateDir(second_location + "/metadata").ok()); + + ICEBERG_UNWRAP_OR_FAIL(update, reloaded->NewUpdateLocation()); + update->SetLocation(second_location); + ICEBERG_UNWRAP_OR_FAIL(result, update->Apply()); + EXPECT_EQ(result, second_location); + EXPECT_THAT(update->Commit(), IsOk()); + + // Reload and verify + ICEBERG_UNWRAP_OR_FAIL(reloaded, catalog_->LoadTable(table_ident_)); + EXPECT_EQ(reloaded->location(), second_location); +} + +} // namespace iceberg diff --git a/src/iceberg/transaction.cc b/src/iceberg/transaction.cc index f763c5670..10a87e653 100644 --- a/src/iceberg/transaction.cc +++ b/src/iceberg/transaction.cc @@ -33,6 +33,7 @@ #include "iceberg/update/expire_snapshots.h" #include "iceberg/update/pending_update.h" #include "iceberg/update/snapshot_update.h" +#include "iceberg/update/update_location.h" #include "iceberg/update/update_partition_spec.h" #include "iceberg/update/update_properties.h" #include "iceberg/update/update_schema.h" @@ -183,6 +184,11 @@ Status Transaction::Apply(PendingUpdate& update) { metadata_builder_->RemoveSchemas(std::move(result.schema_ids_to_remove)); } } break; + case PendingUpdate::Kind::kUpdateLocation: { + auto& update_location = internal::checked_cast(update); + ICEBERG_ASSIGN_OR_RAISE(auto location, update_location.Apply()); + metadata_builder_->SetLocation(location); + } break; default: return NotSupported("Unsupported pending update: {}", static_cast(update.kind())); @@ -280,4 +286,11 @@ Result> Transaction::NewExpireSnapshots() { return expire_snapshots; } +Result> Transaction::NewUpdateLocation() { + ICEBERG_ASSIGN_OR_RAISE(std::shared_ptr update_location, + UpdateLocation::Make(shared_from_this())); + ICEBERG_RETURN_UNEXPECTED(AddUpdate(update_location)); + return update_location; +} + } // namespace iceberg diff --git a/src/iceberg/transaction.h b/src/iceberg/transaction.h index 057a27a90..7133a3b5d 100644 --- a/src/iceberg/transaction.h +++ b/src/iceberg/transaction.h @@ -82,6 +82,10 @@ class ICEBERG_EXPORT Transaction : public std::enable_shared_from_this> NewExpireSnapshots(); + /// \brief Create a new UpdateLocation to update the table location and commit the + /// changes. + Result> NewUpdateLocation(); + private: Transaction(std::shared_ptr
table, Kind kind, bool auto_commit, std::unique_ptr metadata_builder); diff --git a/src/iceberg/type_fwd.h b/src/iceberg/type_fwd.h index c8854031d..251334c14 100644 --- a/src/iceberg/type_fwd.h +++ b/src/iceberg/type_fwd.h @@ -187,13 +187,14 @@ class TableUpdateContext; class Transaction; /// \brief Update family. +class ExpireSnapshots; class PendingUpdate; class SnapshotUpdate; +class UpdateLocation; class UpdatePartitionSpec; class UpdateProperties; class UpdateSchema; class UpdateSortOrder; -class ExpireSnapshots; /// ---------------------------------------------------------------------------- /// TODO: Forward declarations below are not added yet. diff --git a/src/iceberg/update/meson.build b/src/iceberg/update/meson.build index 4238e0222..3387fd11a 100644 --- a/src/iceberg/update/meson.build +++ b/src/iceberg/update/meson.build @@ -17,8 +17,10 @@ install_headers( [ + 'expire_snapshots.h', 'pending_update.h', 'snapshot_update.h', + 'update_location.h', 'update_partition_spec.h', 'update_schema.h', 'update_sort_order.h', diff --git a/src/iceberg/update/pending_update.h b/src/iceberg/update/pending_update.h index 8a8329eee..441d086a8 100644 --- a/src/iceberg/update/pending_update.h +++ b/src/iceberg/update/pending_update.h @@ -43,6 +43,7 @@ class ICEBERG_EXPORT PendingUpdate : public ErrorCollector { public: enum class Kind : uint8_t { kExpireSnapshots, + kUpdateLocation, kUpdatePartitionSpec, kUpdateProperties, kUpdateSchema, diff --git a/src/iceberg/update/update_location.cc b/src/iceberg/update/update_location.cc new file mode 100644 index 000000000..c82a138fc --- /dev/null +++ b/src/iceberg/update/update_location.cc @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/update/update_location.h" + +#include +#include +#include + +#include "iceberg/result.h" +#include "iceberg/transaction.h" +#include "iceberg/util/macros.h" + +namespace iceberg { + +Result> UpdateLocation::Make( + std::shared_ptr transaction) { + ICEBERG_PRECHECK(transaction != nullptr, + "Cannot create UpdateLocation without a transaction"); + return std::shared_ptr(new UpdateLocation(std::move(transaction))); +} + +UpdateLocation::UpdateLocation(std::shared_ptr transaction) + : PendingUpdate(std::move(transaction)) {} + +UpdateLocation::~UpdateLocation() = default; + +UpdateLocation& UpdateLocation::SetLocation(std::string_view location) { + ICEBERG_BUILDER_CHECK(!location.empty(), "Location cannot be empty"); + location_ = std::string(location); + return *this; +} + +Result UpdateLocation::Apply() { + ICEBERG_RETURN_UNEXPECTED(CheckErrors()); + if (location_.empty()) { + return InvalidArgument("Location must be set before applying"); + } + return location_; +} + +} // namespace iceberg diff --git a/src/iceberg/update/update_location.h b/src/iceberg/update/update_location.h new file mode 100644 index 000000000..891853e9e --- /dev/null +++ b/src/iceberg/update/update_location.h @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +#include +#include + +#include "iceberg/type_fwd.h" +#include "iceberg/update/pending_update.h" + +/// \file iceberg/update/update_location.h +/// \brief Updates the table location. + +namespace iceberg { + +/// \brief Updating table location with a new base location. +class ICEBERG_EXPORT UpdateLocation : public PendingUpdate { + public: + static Result> Make( + std::shared_ptr transaction); + + ~UpdateLocation() override; + + /// \brief Sets the new location for the table. + /// + /// \param location The new table location + /// \return Reference to this for method chaining + UpdateLocation& SetLocation(std::string_view location); + + Kind kind() const final { return Kind::kUpdateLocation; } + + /// \brief Apply the pending changes and return the new location. + Result Apply(); + + private: + explicit UpdateLocation(std::shared_ptr transaction); + + std::string location_; +}; + +} // namespace iceberg