From a213dfc57415683db6a0365f2d886da7a771b478 Mon Sep 17 00:00:00 2001 From: Xinli Shang Date: Mon, 29 Dec 2025 12:11:57 -0800 Subject: [PATCH 1/2] feat: add FileWriterFactory with separated writer files Add FileWriterFactory to create data and delete writers, with comprehensive improvements per review feedback. File Organization: - Separate DataWriter to data_writer.h/.cc - Separate PositionDeleteWriter to position_delete_writer.h/.cc - Separate EqualityDeleteWriter to equality_delete_writer.h/.cc - Separate FileWriterFactory to file_writer_factory.h/.cc - Keep only FileWriter base interface in writer.h/.cc Key Features: - Input validation for all factory methods (path, schema, spec) - Thread safety documentation (NOT thread-safe) - State management in stub implementations (is_closed tracking) - Support for Parquet and Avro formats - Pass-by-value + std::move for sink parameters Implementation: - FileWriterFactory directly creates writers (true factory pattern) - Writers use friend pattern - only factory can construct them - Internal MakeXxxInternal functions handle cross-file construction - Stub implementations validate inputs before returning NotImplemented Related to #441 --- src/iceberg/CMakeLists.txt | 4 + src/iceberg/data/data_writer.cc | 73 +++ src/iceberg/data/data_writer.h | 78 +++ src/iceberg/data/equality_delete_writer.cc | 79 +++ src/iceberg/data/equality_delete_writer.h | 83 +++ src/iceberg/data/file_writer_factory.cc | 167 +++++ src/iceberg/data/file_writer_factory.h | 83 +++ src/iceberg/data/position_delete_writer.cc | 85 +++ src/iceberg/data/position_delete_writer.h | 81 +++ src/iceberg/data/writer.h | 19 +- src/iceberg/test/data_writer_test.cc | 713 +++++++++++++++++++++ 11 files changed, 1461 insertions(+), 4 deletions(-) create mode 100644 src/iceberg/data/data_writer.cc create mode 100644 src/iceberg/data/data_writer.h create mode 100644 src/iceberg/data/equality_delete_writer.cc create mode 100644 src/iceberg/data/equality_delete_writer.h create mode 100644 src/iceberg/data/file_writer_factory.cc create mode 100644 src/iceberg/data/file_writer_factory.h create mode 100644 src/iceberg/data/position_delete_writer.cc create mode 100644 src/iceberg/data/position_delete_writer.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 86a0efd7e..190114645 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -20,6 +20,10 @@ set(ICEBERG_INCLUDES "$" set(ICEBERG_SOURCES arrow_c_data_guard_internal.cc catalog/memory/in_memory_catalog.cc + data/data_writer.cc + data/equality_delete_writer.cc + data/file_writer_factory.cc + data/position_delete_writer.cc data/writer.cc delete_file_index.cc expression/aggregate.cc diff --git a/src/iceberg/data/data_writer.cc b/src/iceberg/data/data_writer.cc new file mode 100644 index 000000000..d3648294c --- /dev/null +++ b/src/iceberg/data/data_writer.cc @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/data_writer.h" + +namespace iceberg { + +//============================================================================= +// DataWriter - stub implementation (to be completed in separate PR per #441) +//============================================================================= + +class DataWriter::Impl { + public: + explicit Impl(DataWriterOptions options) : options_(std::move(options)) {} + DataWriterOptions options_; + bool is_closed_ = false; +}; + +DataWriter::DataWriter(std::unique_ptr impl) : impl_(std::move(impl)) {} +DataWriter::~DataWriter() = default; + +Status DataWriter::Write(ArrowArray* data) { + if (!data) { + return InvalidArgument("Cannot write null data"); + } + if (impl_->is_closed_) { + return Invalid("Writer is already closed"); + } + return NotImplemented("DataWriter not yet implemented - see #441"); +} + +Result DataWriter::Length() const { + return NotImplemented("DataWriter not yet implemented - see #441"); +} + +Status DataWriter::Close() { + if (impl_->is_closed_) { + return {}; // Close is idempotent + } + impl_->is_closed_ = true; + return NotImplemented("DataWriter not yet implemented - see #441"); +} + +Result DataWriter::Metadata() { + if (!impl_->is_closed_) { + return Invalid("Writer must be closed before getting metadata"); + } + return NotImplemented("DataWriter not yet implemented - see #441"); +} + +// Internal factory function for FileWriterFactory +std::unique_ptr MakeDataWriterInternal(const DataWriterOptions& options) { + auto impl = std::make_unique(options); + return std::unique_ptr(new DataWriter(std::move(impl))); +} + +} // namespace iceberg diff --git a/src/iceberg/data/data_writer.h b/src/iceberg/data/data_writer.h new file mode 100644 index 000000000..cd7e4d978 --- /dev/null +++ b/src/iceberg/data/data_writer.h @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/data/data_writer.h +/// Data writer for Iceberg tables. + +#include +#include +#include +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/data/writer.h" +#include "iceberg/file_format.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Options for creating a DataWriter. +/// +/// \note The following features from Java DataWriter are not yet supported: +/// - Encryption key metadata (uses FileIO instead of EncryptedOutputFile) +/// - Metrics collection and reporting +/// - Split offsets tracking +struct ICEBERG_EXPORT DataWriterOptions { + std::string path; + std::shared_ptr schema; + std::shared_ptr spec; + PartitionValues partition; + FileFormatType format = FileFormatType::kParquet; + std::shared_ptr io; + std::optional sort_order_id; + std::shared_ptr properties; +}; + +/// \brief Writer for Iceberg data files. +/// +/// \warning Thread Safety: Writer instances are NOT thread-safe. Each writer should only +/// be used by a single thread. Do not call Write(), Close(), or Metadata() concurrently. +class ICEBERG_EXPORT DataWriter : public FileWriter { + public: + ~DataWriter() override; + + Status Write(ArrowArray* data) override; + Result Length() const override; + Status Close() override; + Result Metadata() override; + + private: + friend class FileWriterFactory; + friend std::unique_ptr MakeDataWriterInternal(const DataWriterOptions&); + class Impl; + std::unique_ptr impl_; + explicit DataWriter(std::unique_ptr impl); +}; + +} // namespace iceberg diff --git a/src/iceberg/data/equality_delete_writer.cc b/src/iceberg/data/equality_delete_writer.cc new file mode 100644 index 000000000..8d3487e5e --- /dev/null +++ b/src/iceberg/data/equality_delete_writer.cc @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/equality_delete_writer.h" + +namespace iceberg { + +//============================================================================= +// EqualityDeleteWriter - stub implementation (to be completed in separate PR per #441) +//============================================================================= + +class EqualityDeleteWriter::Impl { + public: + explicit Impl(EqualityDeleteWriterOptions options) : options_(std::move(options)) {} + EqualityDeleteWriterOptions options_; + bool is_closed_ = false; +}; + +EqualityDeleteWriter::EqualityDeleteWriter(std::unique_ptr impl) + : impl_(std::move(impl)) {} +EqualityDeleteWriter::~EqualityDeleteWriter() = default; + +Status EqualityDeleteWriter::Write(ArrowArray* data) { + if (!data) { + return InvalidArgument("Cannot write null data"); + } + if (impl_->is_closed_) { + return Invalid("Writer is already closed"); + } + return NotImplemented("EqualityDeleteWriter not yet implemented - see #441"); +} + +Result EqualityDeleteWriter::Length() const { + return NotImplemented("EqualityDeleteWriter not yet implemented - see #441"); +} + +Status EqualityDeleteWriter::Close() { + if (impl_->is_closed_) { + return {}; // Close is idempotent + } + impl_->is_closed_ = true; + return NotImplemented("EqualityDeleteWriter not yet implemented - see #441"); +} + +Result EqualityDeleteWriter::Metadata() { + if (!impl_->is_closed_) { + return Invalid("Writer must be closed before getting metadata"); + } + return NotImplemented("EqualityDeleteWriter not yet implemented - see #441"); +} + +const std::vector& EqualityDeleteWriter::equality_field_ids() const { + return impl_->options_.equality_field_ids; +} + +// Internal factory function for FileWriterFactory +std::unique_ptr MakeEqualityDeleteWriterInternal( + const EqualityDeleteWriterOptions& options) { + auto impl = std::make_unique(options); + return std::unique_ptr(new EqualityDeleteWriter(std::move(impl))); +} + +} // namespace iceberg diff --git a/src/iceberg/data/equality_delete_writer.h b/src/iceberg/data/equality_delete_writer.h new file mode 100644 index 000000000..100e96194 --- /dev/null +++ b/src/iceberg/data/equality_delete_writer.h @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/data/equality_delete_writer.h +/// Equality delete writer for Iceberg tables. + +#include +#include +#include +#include +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/data/writer.h" +#include "iceberg/file_format.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Options for creating an EqualityDeleteWriter. +/// +/// \note The following features from Java EqualityDeleteWriter are not yet supported: +/// - Encryption key metadata +/// - Metrics collection and reporting +/// - Split offsets tracking +struct ICEBERG_EXPORT EqualityDeleteWriterOptions { + std::string path; + std::shared_ptr schema; + std::shared_ptr spec; + PartitionValues partition; + FileFormatType format = FileFormatType::kParquet; + std::shared_ptr io; + std::vector equality_field_ids; + std::optional sort_order_id; + std::shared_ptr properties; +}; + +/// \brief Writer for Iceberg equality delete files. +/// +/// \warning Thread Safety: Writer instances are NOT thread-safe. Each writer should only +/// be used by a single thread. Do not call Write(), Close(), or Metadata() concurrently. +class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter { + public: + ~EqualityDeleteWriter() override; + + Status Write(ArrowArray* data) override; + Result Length() const override; + Status Close() override; + Result Metadata() override; + + const std::vector& equality_field_ids() const; + + private: + friend class FileWriterFactory; + friend std::unique_ptr MakeEqualityDeleteWriterInternal( + const EqualityDeleteWriterOptions&); + class Impl; + std::unique_ptr impl_; + explicit EqualityDeleteWriter(std::unique_ptr impl); +}; + +} // namespace iceberg diff --git a/src/iceberg/data/file_writer_factory.cc b/src/iceberg/data/file_writer_factory.cc new file mode 100644 index 000000000..69834be8b --- /dev/null +++ b/src/iceberg/data/file_writer_factory.cc @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/file_writer_factory.h" + +#include "iceberg/data/data_writer.h" +#include "iceberg/data/equality_delete_writer.h" +#include "iceberg/data/position_delete_writer.h" + +namespace iceberg { + +// Forward declarations for internal factory functions +std::unique_ptr MakeDataWriterInternal(const DataWriterOptions& options); +std::unique_ptr MakePositionDeleteWriterInternal( + const PositionDeleteWriterOptions& options); +std::unique_ptr MakeEqualityDeleteWriterInternal( + const EqualityDeleteWriterOptions& options); + +//============================================================================= +// FileWriterFactory::Impl +//============================================================================= + +class FileWriterFactory::Impl { + public: + Impl(std::shared_ptr schema, std::shared_ptr spec, + std::shared_ptr io, std::shared_ptr properties) + : schema_(std::move(schema)), + spec_(std::move(spec)), + io_(std::move(io)), + properties_(std::move(properties)) {} + + std::shared_ptr schema_; + std::shared_ptr spec_; + std::shared_ptr io_; + std::shared_ptr properties_; + + std::shared_ptr eq_delete_schema_; + std::vector equality_field_ids_; + std::shared_ptr pos_delete_row_schema_; +}; + +//============================================================================= +// FileWriterFactory +//============================================================================= + +FileWriterFactory::FileWriterFactory(std::shared_ptr schema, + std::shared_ptr spec, + std::shared_ptr io, + std::shared_ptr properties) + : impl_(std::make_unique(std::move(schema), std::move(spec), std::move(io), + std::move(properties))) {} + +FileWriterFactory::~FileWriterFactory() = default; + +void FileWriterFactory::SetEqualityDeleteConfig(std::shared_ptr eq_delete_schema, + std::vector equality_field_ids) { + impl_->eq_delete_schema_ = std::move(eq_delete_schema); + impl_->equality_field_ids_ = std::move(equality_field_ids); +} + +void FileWriterFactory::SetPositionDeleteRowSchema( + std::shared_ptr pos_delete_row_schema) { + impl_->pos_delete_row_schema_ = std::move(pos_delete_row_schema); +} + +Result> FileWriterFactory::NewDataWriter( + std::string path, FileFormatType format, PartitionValues partition, + std::optional sort_order_id) { + // Input validation + if (path.empty()) { + return InvalidArgument("Path cannot be empty"); + } + if (!impl_->schema_) { + return InvalidArgument("Schema cannot be null"); + } + if (!impl_->spec_) { + return InvalidArgument("PartitionSpec cannot be null"); + } + + DataWriterOptions options; + options.path = std::move(path); + options.schema = impl_->schema_; + options.spec = impl_->spec_; + options.partition = std::move(partition); + options.format = format; + options.io = impl_->io_; + options.sort_order_id = sort_order_id; + options.properties = impl_->properties_; + + return MakeDataWriterInternal(options); +} + +Result> FileWriterFactory::NewPositionDeleteWriter( + std::string path, FileFormatType format, PartitionValues partition) { + // Input validation + if (path.empty()) { + return InvalidArgument("Path cannot be empty"); + } + if (!impl_->schema_) { + return InvalidArgument("Schema cannot be null"); + } + if (!impl_->spec_) { + return InvalidArgument("PartitionSpec cannot be null"); + } + + PositionDeleteWriterOptions options; + options.path = std::move(path); + options.schema = impl_->schema_; + options.spec = impl_->spec_; + options.partition = std::move(partition); + options.format = format; + options.io = impl_->io_; + options.row_schema = impl_->pos_delete_row_schema_; + options.properties = impl_->properties_; + + return MakePositionDeleteWriterInternal(options); +} + +Result> FileWriterFactory::NewEqualityDeleteWriter( + std::string path, FileFormatType format, PartitionValues partition, + std::optional sort_order_id) { + // Input validation + if (path.empty()) { + return InvalidArgument("Path cannot be empty"); + } + if (!impl_->schema_) { + return InvalidArgument("Schema cannot be null"); + } + if (!impl_->spec_) { + return InvalidArgument("PartitionSpec cannot be null"); + } + if (impl_->equality_field_ids_.empty()) { + return InvalidArgument( + "Equality field IDs cannot be empty - call SetEqualityDeleteConfig first"); + } + + EqualityDeleteWriterOptions options; + options.path = std::move(path); + options.schema = impl_->eq_delete_schema_ ? impl_->eq_delete_schema_ : impl_->schema_; + options.spec = impl_->spec_; + options.partition = std::move(partition); + options.format = format; + options.io = impl_->io_; + options.equality_field_ids = impl_->equality_field_ids_; + options.sort_order_id = sort_order_id; + options.properties = impl_->properties_; + + return MakeEqualityDeleteWriterInternal(options); +} + +} // namespace iceberg diff --git a/src/iceberg/data/file_writer_factory.h b/src/iceberg/data/file_writer_factory.h new file mode 100644 index 000000000..af0e99c6a --- /dev/null +++ b/src/iceberg/data/file_writer_factory.h @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/data/file_writer_factory.h +/// Factory for creating Iceberg file writers. + +#include +#include +#include +#include +#include + +#include "iceberg/file_format.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +// Forward declarations +class DataWriter; +class PositionDeleteWriter; +class EqualityDeleteWriter; + +/// \brief Factory for creating Iceberg file writers. +/// +/// \warning Thread Safety: This class is NOT thread-safe. Each FileWriterFactory instance +/// should only be used by a single thread. To use from multiple threads, create separate +/// factory instances or use external synchronization. +/// +/// \note Differences from Java FileWriterFactory: +/// - Java uses EncryptedOutputFile parameter, C++ uses path + FileIO +/// - C++ factory has state (schema, spec, io) configured once, reused for all writers +/// - Java FileWriterFactory is an interface, C++ is a concrete class with configuration +/// - C++ provides SetEqualityDeleteConfig() and SetPositionDeleteRowSchema() for +/// customization +class ICEBERG_EXPORT FileWriterFactory { + public: + FileWriterFactory(std::shared_ptr schema, std::shared_ptr spec, + std::shared_ptr io, + std::shared_ptr properties = nullptr); + ~FileWriterFactory(); + + void SetEqualityDeleteConfig(std::shared_ptr eq_delete_schema, + std::vector equality_field_ids); + void SetPositionDeleteRowSchema(std::shared_ptr pos_delete_row_schema); + + Result> NewDataWriter( + std::string path, FileFormatType format, PartitionValues partition, + std::optional sort_order_id = std::nullopt); + + Result> NewPositionDeleteWriter( + std::string path, FileFormatType format, PartitionValues partition); + + Result> NewEqualityDeleteWriter( + std::string path, FileFormatType format, PartitionValues partition, + std::optional sort_order_id = std::nullopt); + + private: + class Impl; + std::unique_ptr impl_; +}; + +} // namespace iceberg diff --git a/src/iceberg/data/position_delete_writer.cc b/src/iceberg/data/position_delete_writer.cc new file mode 100644 index 000000000..836b47532 --- /dev/null +++ b/src/iceberg/data/position_delete_writer.cc @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "iceberg/data/position_delete_writer.h" + +namespace iceberg { + +//============================================================================= +// PositionDeleteWriter - stub implementation (to be completed in separate PR per #441) +//============================================================================= + +class PositionDeleteWriter::Impl { + public: + explicit Impl(PositionDeleteWriterOptions options) : options_(std::move(options)) {} + PositionDeleteWriterOptions options_; + bool is_closed_ = false; +}; + +PositionDeleteWriter::PositionDeleteWriter(std::unique_ptr impl) + : impl_(std::move(impl)) {} +PositionDeleteWriter::~PositionDeleteWriter() = default; + +Status PositionDeleteWriter::Write(ArrowArray* data) { + if (!data) { + return InvalidArgument("Cannot write null data"); + } + if (impl_->is_closed_) { + return Invalid("Writer is already closed"); + } + return NotImplemented("PositionDeleteWriter not yet implemented - see #441"); +} + +Status PositionDeleteWriter::WriteDelete(std::string_view file_path, int64_t pos) { + if (file_path.empty()) { + return InvalidArgument("File path cannot be empty"); + } + if (impl_->is_closed_) { + return Invalid("Writer is already closed"); + } + return NotImplemented("PositionDeleteWriter not yet implemented - see #441"); +} + +Result PositionDeleteWriter::Length() const { + return NotImplemented("PositionDeleteWriter not yet implemented - see #441"); +} + +Status PositionDeleteWriter::Close() { + if (impl_->is_closed_) { + return {}; // Close is idempotent + } + impl_->is_closed_ = true; + return NotImplemented("PositionDeleteWriter not yet implemented - see #441"); +} + +Result PositionDeleteWriter::Metadata() { + if (!impl_->is_closed_) { + return Invalid("Writer must be closed before getting metadata"); + } + return NotImplemented("PositionDeleteWriter not yet implemented - see #441"); +} + +// Internal factory function for FileWriterFactory +std::unique_ptr MakePositionDeleteWriterInternal( + const PositionDeleteWriterOptions& options) { + auto impl = std::make_unique(options); + return std::unique_ptr(new PositionDeleteWriter(std::move(impl))); +} + +} // namespace iceberg diff --git a/src/iceberg/data/position_delete_writer.h b/src/iceberg/data/position_delete_writer.h new file mode 100644 index 000000000..849d65224 --- /dev/null +++ b/src/iceberg/data/position_delete_writer.h @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#pragma once + +/// \file iceberg/data/position_delete_writer.h +/// Position delete writer for Iceberg tables. + +#include +#include +#include +#include + +#include "iceberg/arrow_c_data.h" +#include "iceberg/data/writer.h" +#include "iceberg/file_format.h" +#include "iceberg/iceberg_export.h" +#include "iceberg/result.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/type_fwd.h" + +namespace iceberg { + +/// \brief Options for creating a PositionDeleteWriter. +/// +/// \note The following features from Java PositionDeleteWriter are not yet supported: +/// - Encryption key metadata +/// - Referenced data files tracking (CharSequenceSet referencedDataFiles) +/// - Metrics stripping for multi-file deletes +/// - Split offsets tracking +struct ICEBERG_EXPORT PositionDeleteWriterOptions { + std::string path; + std::shared_ptr schema; + std::shared_ptr spec; + PartitionValues partition; + FileFormatType format = FileFormatType::kParquet; + std::shared_ptr io; + std::shared_ptr row_schema; // Optional row data schema + std::shared_ptr properties; +}; + +/// \brief Writer for Iceberg position delete files. +/// +/// \warning Thread Safety: Writer instances are NOT thread-safe. Each writer should only +/// be used by a single thread. Do not call Write(), Close(), or Metadata() concurrently. +class ICEBERG_EXPORT PositionDeleteWriter : public FileWriter { + public: + ~PositionDeleteWriter() override; + + Status Write(ArrowArray* data) override; + Status WriteDelete(std::string_view file_path, int64_t pos); + Result Length() const override; + Status Close() override; + Result Metadata() override; + + private: + friend class FileWriterFactory; + friend std::unique_ptr MakePositionDeleteWriterInternal( + const PositionDeleteWriterOptions&); + class Impl; + std::unique_ptr impl_; + explicit PositionDeleteWriter(std::unique_ptr impl); +}; + +} // namespace iceberg diff --git a/src/iceberg/data/writer.h b/src/iceberg/data/writer.h index 6c8400911..371f123df 100644 --- a/src/iceberg/data/writer.h +++ b/src/iceberg/data/writer.h @@ -28,8 +28,8 @@ #include "iceberg/arrow_c_data.h" #include "iceberg/iceberg_export.h" +#include "iceberg/manifest/manifest_entry.h" #include "iceberg/result.h" -#include "iceberg/type_fwd.h" namespace iceberg { @@ -64,10 +64,21 @@ class ICEBERG_EXPORT FileWriter { virtual Status Close() = 0; /// \brief File metadata for all files produced by the writer. + /// + /// \note The following features from Java are not yet supported: + /// - Encryption key metadata (EncryptionKeyMetadata) + /// - Split offsets for data files + /// - Referenced data files tracking for position deletes struct ICEBERG_EXPORT WriteResult { - /// Usually a writer produces a single data or delete file. - /// Position delete writer may produce multiple file-scoped delete files. - /// In the future, multiple files can be produced if file rolling is supported. + /// Data files produced by this writer. + /// + /// \note When multiple files are produced: + /// - Position delete writers may produce multiple file-scoped delete files (one per + /// referenced data file) to avoid large manifests + /// - Future feature: File rolling will produce multiple files when size/record + /// thresholds + /// are exceeded + /// - Current implementation: Each writer produces exactly one file std::vector> data_files; }; diff --git a/src/iceberg/test/data_writer_test.cc b/src/iceberg/test/data_writer_test.cc index df7ea9d89..8ba3ab8dc 100644 --- a/src/iceberg/test/data_writer_test.cc +++ b/src/iceberg/test/data_writer_test.cc @@ -17,6 +17,8 @@ * under the License. */ +#include "iceberg/data/data_writer.h" + #include #include @@ -24,10 +26,19 @@ #include #include "iceberg/arrow_c_data.h" +#include "iceberg/data/equality_delete_writer.h" +#include "iceberg/data/file_writer_factory.h" +#include "iceberg/data/position_delete_writer.h" #include "iceberg/data/writer.h" +#include "iceberg/file_format.h" +#include "iceberg/file_io.h" #include "iceberg/manifest/manifest_entry.h" +#include "iceberg/partition_spec.h" #include "iceberg/result.h" +#include "iceberg/row/partition_values.h" +#include "iceberg/schema.h" #include "iceberg/test/matchers.h" +#include "iceberg/transform.h" namespace iceberg { @@ -215,4 +226,706 @@ TEST(FileWriterTest, EmptyWriteResult) { ASSERT_TRUE(result.data_files.empty()); } +// Tests for stub implementations (methods return NotImplemented) +class WriterStubTest : public ::testing::Test { + protected: + void SetUp() override { + schema_ = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "data", string())}); + + ICEBERG_UNWRAP_OR_FAIL(spec_, PartitionSpec::Make(0, {})); + io_ = nullptr; // Not needed for stub tests + } + + std::shared_ptr schema_; + std::shared_ptr spec_; + std::shared_ptr io_; +}; + +TEST_F(WriterStubTest, DataWriterMethodsReturnNotImplemented) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewDataWriter("/test/data/file.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + + auto& writer = *writer_result; + ArrowArray dummy_array = {}; + + // All methods should return NotImplemented + ASSERT_THAT(writer->Write(&dummy_array), + HasErrorMessage("DataWriter not yet implemented")); + ASSERT_THAT(writer->Length(), HasErrorMessage("DataWriter not yet implemented")); + ASSERT_THAT(writer->Close(), HasErrorMessage("DataWriter not yet implemented")); + ASSERT_THAT(writer->Metadata(), HasErrorMessage("DataWriter not yet implemented")); +} + +TEST_F(WriterStubTest, PositionDeleteWriterMethodsReturnNotImplemented) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewPositionDeleteWriter( + "/test/deletes/pos_deletes.parquet", FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + + auto& writer = *writer_result; + ArrowArray dummy_array = {}; + + // All methods should return NotImplemented + ASSERT_THAT(writer->Write(&dummy_array), + HasErrorMessage("PositionDeleteWriter not yet implemented")); + ASSERT_THAT(writer->WriteDelete("/test/file.parquet", 0), + HasErrorMessage("PositionDeleteWriter not yet implemented")); + ASSERT_THAT(writer->Length(), + HasErrorMessage("PositionDeleteWriter not yet implemented")); + ASSERT_THAT(writer->Close(), + HasErrorMessage("PositionDeleteWriter not yet implemented")); + ASSERT_THAT(writer->Metadata(), + HasErrorMessage("PositionDeleteWriter not yet implemented")); +} + +TEST_F(WriterStubTest, EqualityDeleteWriterMethodsReturnNotImplemented) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); + + PartitionValues partition; + auto writer_result = factory.NewEqualityDeleteWriter( + "/test/deletes/eq_deletes.parquet", FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + + auto& writer = *writer_result; + ArrowArray dummy_array = {}; + + // All methods should return NotImplemented + ASSERT_THAT(writer->Write(&dummy_array), + HasErrorMessage("EqualityDeleteWriter not yet implemented")); + ASSERT_THAT(writer->Length(), + HasErrorMessage("EqualityDeleteWriter not yet implemented")); + ASSERT_THAT(writer->Close(), + HasErrorMessage("EqualityDeleteWriter not yet implemented")); + ASSERT_THAT(writer->Metadata(), + HasErrorMessage("EqualityDeleteWriter not yet implemented")); + + // equality_field_ids should return the configured value + ASSERT_EQ(writer->equality_field_ids(), std::vector({1, 2})); +} + +// Tests for FileWriterFactory +class FileWriterFactoryTest : public ::testing::Test { + protected: + void SetUp() override { + schema_ = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "data", string()), + SchemaField::MakeRequired(3, "category", string())}); + + ICEBERG_UNWRAP_OR_FAIL( + spec_, PartitionSpec::Make( + 0, {PartitionField(3, 1000, "category", Transform::Identity())})); + io_ = nullptr; // FileIO not needed for stub tests + } + + std::shared_ptr schema_; + std::shared_ptr spec_; + std::shared_ptr io_; +}; + +TEST_F(FileWriterFactoryTest, ConstructorWithNullProperties) { + FileWriterFactory factory(schema_, spec_, io_, nullptr); + // Should construct successfully without properties +} + +TEST_F(FileWriterFactoryTest, NewDataWriterCreatesWriter) { + FileWriterFactory factory(schema_, spec_, io_); + + PartitionValues partition; + auto result = factory.NewDataWriter("/test/data/file.parquet", FileFormatType::kParquet, + partition); + + // Factory should successfully create a writer + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryTest, NewDataWriterWithSortOrder) { + FileWriterFactory factory(schema_, spec_, io_); + + PartitionValues partition; + auto result = factory.NewDataWriter("/test/data/file.parquet", FileFormatType::kParquet, + partition, + /*sort_order_id=*/1); + + // Factory should successfully create a writer + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryTest, NewPositionDeleteWriterCreatesWriter) { + FileWriterFactory factory(schema_, spec_, io_); + + PartitionValues partition; + auto result = factory.NewPositionDeleteWriter("/test/deletes/pos_deletes.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryTest, NewPositionDeleteWriterWithRowSchema) { + FileWriterFactory factory(schema_, spec_, io_); + + auto row_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}); + factory.SetPositionDeleteRowSchema(row_schema); + + PartitionValues partition; + auto result = factory.NewPositionDeleteWriter("/test/deletes/pos_deletes.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryTest, NewEqualityDeleteWriterCreatesWriter) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); // Must set config first + + PartitionValues partition; + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq_deletes.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryTest, NewEqualityDeleteWriterWithConfig) { + FileWriterFactory factory(schema_, spec_, io_); + + auto eq_delete_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "data", string())}); + std::vector equality_field_ids = {1, 2}; + factory.SetEqualityDeleteConfig(eq_delete_schema, equality_field_ids); + + PartitionValues partition; + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq_deletes.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryTest, NewEqualityDeleteWriterWithSortOrder) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); // Must set config first + + PartitionValues partition; + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq_deletes.parquet", + FileFormatType::kParquet, partition, + /*sort_order_id=*/1); + + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryTest, NewEqualityDeleteWriterUsesDefaultSchema) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(nullptr, {1, 2}); // Set field IDs but no custom schema + + // Don't set custom equality delete schema - should use default schema + PartitionValues partition; + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq_deletes.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryTest, NewEqualityDeleteWriterUsesCustomSchema) { + FileWriterFactory factory(schema_, spec_, io_); + + auto custom_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}); + std::vector equality_field_ids = {1}; + factory.SetEqualityDeleteConfig(custom_schema, equality_field_ids); + + PartitionValues partition; + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq_deletes.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, IsOk()); +} + +// Tests for input validation +class FileWriterFactoryInputValidationTest : public ::testing::Test { + protected: + void SetUp() override { + schema_ = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "data", string())}); + ICEBERG_UNWRAP_OR_FAIL(spec_, PartitionSpec::Make(0, {})); + io_ = nullptr; + } + + std::shared_ptr schema_; + std::shared_ptr spec_; + std::shared_ptr io_; +}; + +TEST_F(FileWriterFactoryInputValidationTest, NewDataWriterRejectsEmptyPath) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto result = factory.NewDataWriter("", FileFormatType::kParquet, partition); + + ASSERT_THAT(result, HasErrorMessage("Path cannot be empty")); +} + +TEST_F(FileWriterFactoryInputValidationTest, NewDataWriterRejectsNullSchema) { + FileWriterFactory factory(nullptr, spec_, io_); + PartitionValues partition; + + auto result = factory.NewDataWriter("/test/data/file.parquet", FileFormatType::kParquet, + partition); + + ASSERT_THAT(result, HasErrorMessage("Schema cannot be null")); +} + +TEST_F(FileWriterFactoryInputValidationTest, NewDataWriterRejectsNullSpec) { + FileWriterFactory factory(schema_, nullptr, io_); + PartitionValues partition; + + auto result = factory.NewDataWriter("/test/data/file.parquet", FileFormatType::kParquet, + partition); + + ASSERT_THAT(result, HasErrorMessage("PartitionSpec cannot be null")); +} + +TEST_F(FileWriterFactoryInputValidationTest, NewPositionDeleteWriterRejectsEmptyPath) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto result = factory.NewPositionDeleteWriter("", FileFormatType::kParquet, partition); + + ASSERT_THAT(result, HasErrorMessage("Path cannot be empty")); +} + +TEST_F(FileWriterFactoryInputValidationTest, NewPositionDeleteWriterRejectsNullSchema) { + FileWriterFactory factory(nullptr, spec_, io_); + PartitionValues partition; + + auto result = factory.NewPositionDeleteWriter("/test/deletes/pos.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, HasErrorMessage("Schema cannot be null")); +} + +TEST_F(FileWriterFactoryInputValidationTest, NewPositionDeleteWriterRejectsNullSpec) { + FileWriterFactory factory(schema_, nullptr, io_); + PartitionValues partition; + + auto result = factory.NewPositionDeleteWriter("/test/deletes/pos.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, HasErrorMessage("PartitionSpec cannot be null")); +} + +TEST_F(FileWriterFactoryInputValidationTest, NewEqualityDeleteWriterRejectsEmptyPath) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); + PartitionValues partition; + + auto result = factory.NewEqualityDeleteWriter("", FileFormatType::kParquet, partition); + + ASSERT_THAT(result, HasErrorMessage("Path cannot be empty")); +} + +TEST_F(FileWriterFactoryInputValidationTest, NewEqualityDeleteWriterRejectsNullSchema) { + FileWriterFactory factory(nullptr, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); + PartitionValues partition; + + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, HasErrorMessage("Schema cannot be null")); +} + +TEST_F(FileWriterFactoryInputValidationTest, NewEqualityDeleteWriterRejectsNullSpec) { + FileWriterFactory factory(schema_, nullptr, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); + PartitionValues partition; + + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, HasErrorMessage("PartitionSpec cannot be null")); +} + +TEST_F(FileWriterFactoryInputValidationTest, + NewEqualityDeleteWriterRejectsEmptyFieldIds) { + FileWriterFactory factory(schema_, spec_, io_); + // Don't set equality config, so field IDs will be empty + PartitionValues partition; + + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(result, HasErrorMessage("Equality field IDs cannot be empty")); +} + +// Tests for state management in stub writers +class WriterStubStateManagementTest : public ::testing::Test { + protected: + void SetUp() override { + schema_ = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "data", string())}); + ICEBERG_UNWRAP_OR_FAIL(spec_, PartitionSpec::Make(0, {})); + io_ = nullptr; + } + + std::shared_ptr schema_; + std::shared_ptr spec_; + std::shared_ptr io_; +}; + +TEST_F(WriterStubStateManagementTest, DataWriterRejectsWriteAfterClose) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewDataWriter("/test/data/file.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + // Close the writer + ASSERT_THAT(writer->Close(), HasErrorMessage("DataWriter not yet implemented")); + + // Now try to write - should fail because writer is closed + ArrowArray dummy_array = {}; + auto status = writer->Write(&dummy_array); + ASSERT_THAT(status, HasErrorMessage("Writer is already closed")); +} + +TEST_F(WriterStubStateManagementTest, DataWriterCloseIsIdempotent) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewDataWriter("/test/data/file.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + // Close multiple times - second close should succeed (idempotent) + ASSERT_THAT(writer->Close(), HasErrorMessage("DataWriter not yet implemented")); + ASSERT_THAT(writer->Close(), IsOk()); + ASSERT_THAT(writer->Close(), IsOk()); +} + +TEST_F(WriterStubStateManagementTest, DataWriterRejectsNullData) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewDataWriter("/test/data/file.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + auto status = writer->Write(nullptr); + ASSERT_THAT(status, HasErrorMessage("Cannot write null data")); +} + +TEST_F(WriterStubStateManagementTest, PositionDeleteWriterRejectsWriteAfterClose) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewPositionDeleteWriter( + "/test/deletes/pos.parquet", FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + // Close the writer + ASSERT_THAT(writer->Close(), + HasErrorMessage("PositionDeleteWriter not yet implemented")); + + // Try to write - should fail + ArrowArray dummy_array = {}; + auto status = writer->Write(&dummy_array); + ASSERT_THAT(status, HasErrorMessage("Writer is already closed")); +} + +TEST_F(WriterStubStateManagementTest, PositionDeleteWriterRejectsWriteDeleteAfterClose) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewPositionDeleteWriter( + "/test/deletes/pos.parquet", FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + // Close the writer + ASSERT_THAT(writer->Close(), + HasErrorMessage("PositionDeleteWriter not yet implemented")); + + // Try WriteDelete - should fail + auto status = writer->WriteDelete("/test/file.parquet", 100); + ASSERT_THAT(status, HasErrorMessage("Writer is already closed")); +} + +TEST_F(WriterStubStateManagementTest, PositionDeleteWriterRejectsNullData) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewPositionDeleteWriter( + "/test/deletes/pos.parquet", FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + auto status = writer->Write(nullptr); + ASSERT_THAT(status, HasErrorMessage("Cannot write null data")); +} + +TEST_F(WriterStubStateManagementTest, PositionDeleteWriterRejectsEmptyFilePath) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto writer_result = factory.NewPositionDeleteWriter( + "/test/deletes/pos.parquet", FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + auto status = writer->WriteDelete("", 100); + ASSERT_THAT(status, HasErrorMessage("File path cannot be empty")); +} + +TEST_F(WriterStubStateManagementTest, EqualityDeleteWriterRejectsWriteAfterClose) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); + PartitionValues partition; + + auto writer_result = factory.NewEqualityDeleteWriter( + "/test/deletes/eq.parquet", FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + // Close the writer + ASSERT_THAT(writer->Close(), + HasErrorMessage("EqualityDeleteWriter not yet implemented")); + + // Try to write - should fail + ArrowArray dummy_array = {}; + auto status = writer->Write(&dummy_array); + ASSERT_THAT(status, HasErrorMessage("Writer is already closed")); +} + +TEST_F(WriterStubStateManagementTest, EqualityDeleteWriterRejectsNullData) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); + PartitionValues partition; + + auto writer_result = factory.NewEqualityDeleteWriter( + "/test/deletes/eq.parquet", FileFormatType::kParquet, partition); + ASSERT_THAT(writer_result, IsOk()); + auto& writer = *writer_result; + + auto status = writer->Write(nullptr); + ASSERT_THAT(status, HasErrorMessage("Cannot write null data")); +} + +// Tests for different file formats +class FileWriterFactoryFileFormatTest : public ::testing::Test { + protected: + void SetUp() override { + schema_ = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}); + ICEBERG_UNWRAP_OR_FAIL(spec_, PartitionSpec::Make(0, {})); + io_ = nullptr; + } + + std::shared_ptr schema_; + std::shared_ptr spec_; + std::shared_ptr io_; +}; + +TEST_F(FileWriterFactoryFileFormatTest, DataWriterSupportsParquet) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto result = factory.NewDataWriter("/test/data/file.parquet", FileFormatType::kParquet, + partition); + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryFileFormatTest, DataWriterSupportsAvro) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto result = + factory.NewDataWriter("/test/data/file.avro", FileFormatType::kAvro, partition); + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryFileFormatTest, PositionDeleteWriterSupportsParquet) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto result = factory.NewPositionDeleteWriter("/test/deletes/pos.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryFileFormatTest, PositionDeleteWriterSupportsAvro) { + FileWriterFactory factory(schema_, spec_, io_); + PartitionValues partition; + + auto result = factory.NewPositionDeleteWriter("/test/deletes/pos.avro", + FileFormatType::kAvro, partition); + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryFileFormatTest, EqualityDeleteWriterSupportsParquet) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1}); + PartitionValues partition; + + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(result, IsOk()); +} + +TEST_F(FileWriterFactoryFileFormatTest, EqualityDeleteWriterSupportsAvro) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1}); + PartitionValues partition; + + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.avro", + FileFormatType::kAvro, partition); + ASSERT_THAT(result, IsOk()); +} + +// Edge case tests +class WriterEdgeCaseTest : public ::testing::Test { + protected: + void SetUp() override { + schema_ = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "data", string())}); + ICEBERG_UNWRAP_OR_FAIL(spec_, PartitionSpec::Make(0, {})); + io_ = nullptr; + } + + std::shared_ptr schema_; + std::shared_ptr spec_; + std::shared_ptr io_; +}; + +TEST_F(WriterEdgeCaseTest, EqualityDeleteWriterWithSingleFieldId) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1}); // Single field ID + + PartitionValues partition; + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(result, IsOk()); + + auto& writer = *result; + ASSERT_EQ(writer->equality_field_ids(), std::vector({1})); +} + +TEST_F(WriterEdgeCaseTest, EqualityDeleteWriterWithMultipleFieldIds) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); // Multiple field IDs + + PartitionValues partition; + auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(result, IsOk()); + + auto& writer = *result; + ASSERT_EQ(writer->equality_field_ids(), std::vector({1, 2})); +} + +TEST_F(WriterEdgeCaseTest, MultipleWritersFromSameFactory) { + FileWriterFactory factory(schema_, spec_, io_); + factory.SetEqualityDeleteConfig(schema_, {1, 2}); + + PartitionValues partition; + + // Create multiple writers from the same factory + auto data_writer1 = + factory.NewDataWriter("/test/data1.parquet", FileFormatType::kParquet, partition); + auto data_writer2 = + factory.NewDataWriter("/test/data2.parquet", FileFormatType::kParquet, partition); + auto pos_writer = factory.NewPositionDeleteWriter("/test/pos.parquet", + FileFormatType::kParquet, partition); + auto eq_writer = factory.NewEqualityDeleteWriter("/test/eq.parquet", + FileFormatType::kParquet, partition); + + ASSERT_THAT(data_writer1, IsOk()); + ASSERT_THAT(data_writer2, IsOk()); + ASSERT_THAT(pos_writer, IsOk()); + ASSERT_THAT(eq_writer, IsOk()); +} + +TEST_F(WriterEdgeCaseTest, FactoryWithPartitionedSpec) { + auto partitioned_schema = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "category", string()), + SchemaField::MakeRequired(3, "data", string())}); + + auto partitioned_spec_result = PartitionSpec::Make( + 0, {PartitionField(2, 1000, "category", Transform::Identity())}); + ASSERT_THAT(partitioned_spec_result, IsOk()); + auto partitioned_spec = + std::shared_ptr(std::move(*partitioned_spec_result)); + + FileWriterFactory factory(partitioned_schema, partitioned_spec, io_); + + PartitionValues partition; + partition.AddValue(Literal::String("electronics")); + + auto result = factory.NewDataWriter("/test/data/category=electronics/file.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(result, IsOk()); +} + +TEST_F(WriterEdgeCaseTest, ReconfigureEqualityDeleteConfig) { + FileWriterFactory factory(schema_, spec_, io_); + + // Set initial config + factory.SetEqualityDeleteConfig(schema_, {1}); + + PartitionValues partition; + auto writer1 = factory.NewEqualityDeleteWriter("/test/eq1.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(writer1, IsOk()); + ASSERT_EQ((*writer1)->equality_field_ids(), std::vector({1})); + + // Reconfigure with different field IDs + factory.SetEqualityDeleteConfig(schema_, {1, 2}); + + auto writer2 = factory.NewEqualityDeleteWriter("/test/eq2.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(writer2, IsOk()); + ASSERT_EQ((*writer2)->equality_field_ids(), std::vector({1, 2})); +} + +TEST_F(WriterEdgeCaseTest, ReconfigurePositionDeleteRowSchema) { + FileWriterFactory factory(schema_, spec_, io_); + + auto row_schema1 = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32())}); + factory.SetPositionDeleteRowSchema(row_schema1); + + PartitionValues partition; + auto writer1 = factory.NewPositionDeleteWriter("/test/pos1.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(writer1, IsOk()); + + // Reconfigure with different row schema + auto row_schema2 = std::make_shared( + std::vector{SchemaField::MakeRequired(1, "id", int32()), + SchemaField::MakeRequired(2, "data", string())}); + factory.SetPositionDeleteRowSchema(row_schema2); + + auto writer2 = factory.NewPositionDeleteWriter("/test/pos2.parquet", + FileFormatType::kParquet, partition); + ASSERT_THAT(writer2, IsOk()); +} + } // namespace iceberg From bd1a6e485b8cc15ad2b34ec8732c22ba2dfd7068 Mon Sep 17 00:00:00 2001 From: Gang Wu Date: Fri, 16 Jan 2026 16:35:18 +0800 Subject: [PATCH 2/2] remove most unnecessary lines to focus on interface design --- src/iceberg/CMakeLists.txt | 1 - src/iceberg/data/data_writer.cc | 43 +- src/iceberg/data/data_writer.h | 14 +- src/iceberg/data/equality_delete_writer.cc | 47 +- src/iceberg/data/equality_delete_writer.h | 19 +- src/iceberg/data/file_writer_factory.cc | 167 ---- src/iceberg/data/file_writer_factory.h | 83 -- src/iceberg/data/position_delete_writer.cc | 51 +- src/iceberg/data/position_delete_writer.h | 16 +- src/iceberg/data/writer.h | 44 +- src/iceberg/test/data_writer_test.cc | 910 +-------------------- 11 files changed, 30 insertions(+), 1365 deletions(-) delete mode 100644 src/iceberg/data/file_writer_factory.cc delete mode 100644 src/iceberg/data/file_writer_factory.h diff --git a/src/iceberg/CMakeLists.txt b/src/iceberg/CMakeLists.txt index 190114645..42442281b 100644 --- a/src/iceberg/CMakeLists.txt +++ b/src/iceberg/CMakeLists.txt @@ -22,7 +22,6 @@ set(ICEBERG_SOURCES catalog/memory/in_memory_catalog.cc data/data_writer.cc data/equality_delete_writer.cc - data/file_writer_factory.cc data/position_delete_writer.cc data/writer.cc delete_file_index.cc diff --git a/src/iceberg/data/data_writer.cc b/src/iceberg/data/data_writer.cc index d3648294c..0998e9efb 100644 --- a/src/iceberg/data/data_writer.cc +++ b/src/iceberg/data/data_writer.cc @@ -21,53 +21,18 @@ namespace iceberg { -//============================================================================= -// DataWriter - stub implementation (to be completed in separate PR per #441) -//============================================================================= - class DataWriter::Impl { public: - explicit Impl(DataWriterOptions options) : options_(std::move(options)) {} - DataWriterOptions options_; - bool is_closed_ = false; }; -DataWriter::DataWriter(std::unique_ptr impl) : impl_(std::move(impl)) {} DataWriter::~DataWriter() = default; -Status DataWriter::Write(ArrowArray* data) { - if (!data) { - return InvalidArgument("Cannot write null data"); - } - if (impl_->is_closed_) { - return Invalid("Writer is already closed"); - } - return NotImplemented("DataWriter not yet implemented - see #441"); -} - -Result DataWriter::Length() const { - return NotImplemented("DataWriter not yet implemented - see #441"); -} +Status DataWriter::Write(ArrowArray* data) { return NotImplemented(""); } -Status DataWriter::Close() { - if (impl_->is_closed_) { - return {}; // Close is idempotent - } - impl_->is_closed_ = true; - return NotImplemented("DataWriter not yet implemented - see #441"); -} +Result DataWriter::Length() const { return NotImplemented(""); } -Result DataWriter::Metadata() { - if (!impl_->is_closed_) { - return Invalid("Writer must be closed before getting metadata"); - } - return NotImplemented("DataWriter not yet implemented - see #441"); -} +Status DataWriter::Close() { return NotImplemented(""); } -// Internal factory function for FileWriterFactory -std::unique_ptr MakeDataWriterInternal(const DataWriterOptions& options) { - auto impl = std::make_unique(options); - return std::unique_ptr(new DataWriter(std::move(impl))); -} +Result DataWriter::Metadata() { return NotImplemented(""); } } // namespace iceberg diff --git a/src/iceberg/data/data_writer.h b/src/iceberg/data/data_writer.h index cd7e4d978..08ac5f70f 100644 --- a/src/iceberg/data/data_writer.h +++ b/src/iceberg/data/data_writer.h @@ -26,6 +26,7 @@ #include #include #include +#include #include "iceberg/arrow_c_data.h" #include "iceberg/data/writer.h" @@ -38,11 +39,6 @@ namespace iceberg { /// \brief Options for creating a DataWriter. -/// -/// \note The following features from Java DataWriter are not yet supported: -/// - Encryption key metadata (uses FileIO instead of EncryptedOutputFile) -/// - Metrics collection and reporting -/// - Split offsets tracking struct ICEBERG_EXPORT DataWriterOptions { std::string path; std::shared_ptr schema; @@ -51,13 +47,10 @@ struct ICEBERG_EXPORT DataWriterOptions { FileFormatType format = FileFormatType::kParquet; std::shared_ptr io; std::optional sort_order_id; - std::shared_ptr properties; + std::unordered_map properties; }; /// \brief Writer for Iceberg data files. -/// -/// \warning Thread Safety: Writer instances are NOT thread-safe. Each writer should only -/// be used by a single thread. Do not call Write(), Close(), or Metadata() concurrently. class ICEBERG_EXPORT DataWriter : public FileWriter { public: ~DataWriter() override; @@ -68,11 +61,8 @@ class ICEBERG_EXPORT DataWriter : public FileWriter { Result Metadata() override; private: - friend class FileWriterFactory; - friend std::unique_ptr MakeDataWriterInternal(const DataWriterOptions&); class Impl; std::unique_ptr impl_; - explicit DataWriter(std::unique_ptr impl); }; } // namespace iceberg diff --git a/src/iceberg/data/equality_delete_writer.cc b/src/iceberg/data/equality_delete_writer.cc index 8d3487e5e..3edb942cb 100644 --- a/src/iceberg/data/equality_delete_writer.cc +++ b/src/iceberg/data/equality_delete_writer.cc @@ -21,59 +21,22 @@ namespace iceberg { -//============================================================================= -// EqualityDeleteWriter - stub implementation (to be completed in separate PR per #441) -//============================================================================= - class EqualityDeleteWriter::Impl { public: - explicit Impl(EqualityDeleteWriterOptions options) : options_(std::move(options)) {} - EqualityDeleteWriterOptions options_; - bool is_closed_ = false; }; -EqualityDeleteWriter::EqualityDeleteWriter(std::unique_ptr impl) - : impl_(std::move(impl)) {} EqualityDeleteWriter::~EqualityDeleteWriter() = default; -Status EqualityDeleteWriter::Write(ArrowArray* data) { - if (!data) { - return InvalidArgument("Cannot write null data"); - } - if (impl_->is_closed_) { - return Invalid("Writer is already closed"); - } - return NotImplemented("EqualityDeleteWriter not yet implemented - see #441"); -} +Status EqualityDeleteWriter::Write(ArrowArray* data) { return NotImplemented(""); } -Result EqualityDeleteWriter::Length() const { - return NotImplemented("EqualityDeleteWriter not yet implemented - see #441"); -} +Result EqualityDeleteWriter::Length() const { return NotImplemented(""); } -Status EqualityDeleteWriter::Close() { - if (impl_->is_closed_) { - return {}; // Close is idempotent - } - impl_->is_closed_ = true; - return NotImplemented("EqualityDeleteWriter not yet implemented - see #441"); -} +Status EqualityDeleteWriter::Close() { return NotImplemented(""); } Result EqualityDeleteWriter::Metadata() { - if (!impl_->is_closed_) { - return Invalid("Writer must be closed before getting metadata"); - } - return NotImplemented("EqualityDeleteWriter not yet implemented - see #441"); + return NotImplemented(""); } -const std::vector& EqualityDeleteWriter::equality_field_ids() const { - return impl_->options_.equality_field_ids; -} - -// Internal factory function for FileWriterFactory -std::unique_ptr MakeEqualityDeleteWriterInternal( - const EqualityDeleteWriterOptions& options) { - auto impl = std::make_unique(options); - return std::unique_ptr(new EqualityDeleteWriter(std::move(impl))); -} +std::span EqualityDeleteWriter::equality_field_ids() const { return {}; } } // namespace iceberg diff --git a/src/iceberg/data/equality_delete_writer.h b/src/iceberg/data/equality_delete_writer.h index 100e96194..9de4918dc 100644 --- a/src/iceberg/data/equality_delete_writer.h +++ b/src/iceberg/data/equality_delete_writer.h @@ -25,8 +25,9 @@ #include #include #include +#include #include -#include +#include #include "iceberg/arrow_c_data.h" #include "iceberg/data/writer.h" @@ -39,11 +40,6 @@ namespace iceberg { /// \brief Options for creating an EqualityDeleteWriter. -/// -/// \note The following features from Java EqualityDeleteWriter are not yet supported: -/// - Encryption key metadata -/// - Metrics collection and reporting -/// - Split offsets tracking struct ICEBERG_EXPORT EqualityDeleteWriterOptions { std::string path; std::shared_ptr schema; @@ -53,13 +49,10 @@ struct ICEBERG_EXPORT EqualityDeleteWriterOptions { std::shared_ptr io; std::vector equality_field_ids; std::optional sort_order_id; - std::shared_ptr properties; + std::unordered_map properties; }; /// \brief Writer for Iceberg equality delete files. -/// -/// \warning Thread Safety: Writer instances are NOT thread-safe. Each writer should only -/// be used by a single thread. Do not call Write(), Close(), or Metadata() concurrently. class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter { public: ~EqualityDeleteWriter() override; @@ -69,15 +62,11 @@ class ICEBERG_EXPORT EqualityDeleteWriter : public FileWriter { Status Close() override; Result Metadata() override; - const std::vector& equality_field_ids() const; + std::span equality_field_ids() const; private: - friend class FileWriterFactory; - friend std::unique_ptr MakeEqualityDeleteWriterInternal( - const EqualityDeleteWriterOptions&); class Impl; std::unique_ptr impl_; - explicit EqualityDeleteWriter(std::unique_ptr impl); }; } // namespace iceberg diff --git a/src/iceberg/data/file_writer_factory.cc b/src/iceberg/data/file_writer_factory.cc deleted file mode 100644 index 69834be8b..000000000 --- a/src/iceberg/data/file_writer_factory.cc +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "iceberg/data/file_writer_factory.h" - -#include "iceberg/data/data_writer.h" -#include "iceberg/data/equality_delete_writer.h" -#include "iceberg/data/position_delete_writer.h" - -namespace iceberg { - -// Forward declarations for internal factory functions -std::unique_ptr MakeDataWriterInternal(const DataWriterOptions& options); -std::unique_ptr MakePositionDeleteWriterInternal( - const PositionDeleteWriterOptions& options); -std::unique_ptr MakeEqualityDeleteWriterInternal( - const EqualityDeleteWriterOptions& options); - -//============================================================================= -// FileWriterFactory::Impl -//============================================================================= - -class FileWriterFactory::Impl { - public: - Impl(std::shared_ptr schema, std::shared_ptr spec, - std::shared_ptr io, std::shared_ptr properties) - : schema_(std::move(schema)), - spec_(std::move(spec)), - io_(std::move(io)), - properties_(std::move(properties)) {} - - std::shared_ptr schema_; - std::shared_ptr spec_; - std::shared_ptr io_; - std::shared_ptr properties_; - - std::shared_ptr eq_delete_schema_; - std::vector equality_field_ids_; - std::shared_ptr pos_delete_row_schema_; -}; - -//============================================================================= -// FileWriterFactory -//============================================================================= - -FileWriterFactory::FileWriterFactory(std::shared_ptr schema, - std::shared_ptr spec, - std::shared_ptr io, - std::shared_ptr properties) - : impl_(std::make_unique(std::move(schema), std::move(spec), std::move(io), - std::move(properties))) {} - -FileWriterFactory::~FileWriterFactory() = default; - -void FileWriterFactory::SetEqualityDeleteConfig(std::shared_ptr eq_delete_schema, - std::vector equality_field_ids) { - impl_->eq_delete_schema_ = std::move(eq_delete_schema); - impl_->equality_field_ids_ = std::move(equality_field_ids); -} - -void FileWriterFactory::SetPositionDeleteRowSchema( - std::shared_ptr pos_delete_row_schema) { - impl_->pos_delete_row_schema_ = std::move(pos_delete_row_schema); -} - -Result> FileWriterFactory::NewDataWriter( - std::string path, FileFormatType format, PartitionValues partition, - std::optional sort_order_id) { - // Input validation - if (path.empty()) { - return InvalidArgument("Path cannot be empty"); - } - if (!impl_->schema_) { - return InvalidArgument("Schema cannot be null"); - } - if (!impl_->spec_) { - return InvalidArgument("PartitionSpec cannot be null"); - } - - DataWriterOptions options; - options.path = std::move(path); - options.schema = impl_->schema_; - options.spec = impl_->spec_; - options.partition = std::move(partition); - options.format = format; - options.io = impl_->io_; - options.sort_order_id = sort_order_id; - options.properties = impl_->properties_; - - return MakeDataWriterInternal(options); -} - -Result> FileWriterFactory::NewPositionDeleteWriter( - std::string path, FileFormatType format, PartitionValues partition) { - // Input validation - if (path.empty()) { - return InvalidArgument("Path cannot be empty"); - } - if (!impl_->schema_) { - return InvalidArgument("Schema cannot be null"); - } - if (!impl_->spec_) { - return InvalidArgument("PartitionSpec cannot be null"); - } - - PositionDeleteWriterOptions options; - options.path = std::move(path); - options.schema = impl_->schema_; - options.spec = impl_->spec_; - options.partition = std::move(partition); - options.format = format; - options.io = impl_->io_; - options.row_schema = impl_->pos_delete_row_schema_; - options.properties = impl_->properties_; - - return MakePositionDeleteWriterInternal(options); -} - -Result> FileWriterFactory::NewEqualityDeleteWriter( - std::string path, FileFormatType format, PartitionValues partition, - std::optional sort_order_id) { - // Input validation - if (path.empty()) { - return InvalidArgument("Path cannot be empty"); - } - if (!impl_->schema_) { - return InvalidArgument("Schema cannot be null"); - } - if (!impl_->spec_) { - return InvalidArgument("PartitionSpec cannot be null"); - } - if (impl_->equality_field_ids_.empty()) { - return InvalidArgument( - "Equality field IDs cannot be empty - call SetEqualityDeleteConfig first"); - } - - EqualityDeleteWriterOptions options; - options.path = std::move(path); - options.schema = impl_->eq_delete_schema_ ? impl_->eq_delete_schema_ : impl_->schema_; - options.spec = impl_->spec_; - options.partition = std::move(partition); - options.format = format; - options.io = impl_->io_; - options.equality_field_ids = impl_->equality_field_ids_; - options.sort_order_id = sort_order_id; - options.properties = impl_->properties_; - - return MakeEqualityDeleteWriterInternal(options); -} - -} // namespace iceberg diff --git a/src/iceberg/data/file_writer_factory.h b/src/iceberg/data/file_writer_factory.h deleted file mode 100644 index af0e99c6a..000000000 --- a/src/iceberg/data/file_writer_factory.h +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#pragma once - -/// \file iceberg/data/file_writer_factory.h -/// Factory for creating Iceberg file writers. - -#include -#include -#include -#include -#include - -#include "iceberg/file_format.h" -#include "iceberg/iceberg_export.h" -#include "iceberg/result.h" -#include "iceberg/row/partition_values.h" -#include "iceberg/type_fwd.h" - -namespace iceberg { - -// Forward declarations -class DataWriter; -class PositionDeleteWriter; -class EqualityDeleteWriter; - -/// \brief Factory for creating Iceberg file writers. -/// -/// \warning Thread Safety: This class is NOT thread-safe. Each FileWriterFactory instance -/// should only be used by a single thread. To use from multiple threads, create separate -/// factory instances or use external synchronization. -/// -/// \note Differences from Java FileWriterFactory: -/// - Java uses EncryptedOutputFile parameter, C++ uses path + FileIO -/// - C++ factory has state (schema, spec, io) configured once, reused for all writers -/// - Java FileWriterFactory is an interface, C++ is a concrete class with configuration -/// - C++ provides SetEqualityDeleteConfig() and SetPositionDeleteRowSchema() for -/// customization -class ICEBERG_EXPORT FileWriterFactory { - public: - FileWriterFactory(std::shared_ptr schema, std::shared_ptr spec, - std::shared_ptr io, - std::shared_ptr properties = nullptr); - ~FileWriterFactory(); - - void SetEqualityDeleteConfig(std::shared_ptr eq_delete_schema, - std::vector equality_field_ids); - void SetPositionDeleteRowSchema(std::shared_ptr pos_delete_row_schema); - - Result> NewDataWriter( - std::string path, FileFormatType format, PartitionValues partition, - std::optional sort_order_id = std::nullopt); - - Result> NewPositionDeleteWriter( - std::string path, FileFormatType format, PartitionValues partition); - - Result> NewEqualityDeleteWriter( - std::string path, FileFormatType format, PartitionValues partition, - std::optional sort_order_id = std::nullopt); - - private: - class Impl; - std::unique_ptr impl_; -}; - -} // namespace iceberg diff --git a/src/iceberg/data/position_delete_writer.cc b/src/iceberg/data/position_delete_writer.cc index 836b47532..f58741083 100644 --- a/src/iceberg/data/position_delete_writer.cc +++ b/src/iceberg/data/position_delete_writer.cc @@ -21,65 +21,24 @@ namespace iceberg { -//============================================================================= -// PositionDeleteWriter - stub implementation (to be completed in separate PR per #441) -//============================================================================= - class PositionDeleteWriter::Impl { public: - explicit Impl(PositionDeleteWriterOptions options) : options_(std::move(options)) {} - PositionDeleteWriterOptions options_; - bool is_closed_ = false; }; -PositionDeleteWriter::PositionDeleteWriter(std::unique_ptr impl) - : impl_(std::move(impl)) {} PositionDeleteWriter::~PositionDeleteWriter() = default; -Status PositionDeleteWriter::Write(ArrowArray* data) { - if (!data) { - return InvalidArgument("Cannot write null data"); - } - if (impl_->is_closed_) { - return Invalid("Writer is already closed"); - } - return NotImplemented("PositionDeleteWriter not yet implemented - see #441"); -} +Status PositionDeleteWriter::Write(ArrowArray* data) { return NotImplemented(""); } Status PositionDeleteWriter::WriteDelete(std::string_view file_path, int64_t pos) { - if (file_path.empty()) { - return InvalidArgument("File path cannot be empty"); - } - if (impl_->is_closed_) { - return Invalid("Writer is already closed"); - } - return NotImplemented("PositionDeleteWriter not yet implemented - see #441"); + return NotImplemented(""); } -Result PositionDeleteWriter::Length() const { - return NotImplemented("PositionDeleteWriter not yet implemented - see #441"); -} +Result PositionDeleteWriter::Length() const { return NotImplemented(""); } -Status PositionDeleteWriter::Close() { - if (impl_->is_closed_) { - return {}; // Close is idempotent - } - impl_->is_closed_ = true; - return NotImplemented("PositionDeleteWriter not yet implemented - see #441"); -} +Status PositionDeleteWriter::Close() { return NotImplemented(""); } Result PositionDeleteWriter::Metadata() { - if (!impl_->is_closed_) { - return Invalid("Writer must be closed before getting metadata"); - } - return NotImplemented("PositionDeleteWriter not yet implemented - see #441"); -} - -// Internal factory function for FileWriterFactory -std::unique_ptr MakePositionDeleteWriterInternal( - const PositionDeleteWriterOptions& options) { - auto impl = std::make_unique(options); - return std::unique_ptr(new PositionDeleteWriter(std::move(impl))); + return NotImplemented(""); } } // namespace iceberg diff --git a/src/iceberg/data/position_delete_writer.h b/src/iceberg/data/position_delete_writer.h index 849d65224..c660812cd 100644 --- a/src/iceberg/data/position_delete_writer.h +++ b/src/iceberg/data/position_delete_writer.h @@ -26,6 +26,7 @@ #include #include #include +#include #include "iceberg/arrow_c_data.h" #include "iceberg/data/writer.h" @@ -38,12 +39,6 @@ namespace iceberg { /// \brief Options for creating a PositionDeleteWriter. -/// -/// \note The following features from Java PositionDeleteWriter are not yet supported: -/// - Encryption key metadata -/// - Referenced data files tracking (CharSequenceSet referencedDataFiles) -/// - Metrics stripping for multi-file deletes -/// - Split offsets tracking struct ICEBERG_EXPORT PositionDeleteWriterOptions { std::string path; std::shared_ptr schema; @@ -52,13 +47,10 @@ struct ICEBERG_EXPORT PositionDeleteWriterOptions { FileFormatType format = FileFormatType::kParquet; std::shared_ptr io; std::shared_ptr row_schema; // Optional row data schema - std::shared_ptr properties; + std::unordered_map properties; }; /// \brief Writer for Iceberg position delete files. -/// -/// \warning Thread Safety: Writer instances are NOT thread-safe. Each writer should only -/// be used by a single thread. Do not call Write(), Close(), or Metadata() concurrently. class ICEBERG_EXPORT PositionDeleteWriter : public FileWriter { public: ~PositionDeleteWriter() override; @@ -70,12 +62,8 @@ class ICEBERG_EXPORT PositionDeleteWriter : public FileWriter { Result Metadata() override; private: - friend class FileWriterFactory; - friend std::unique_ptr MakePositionDeleteWriterInternal( - const PositionDeleteWriterOptions&); class Impl; std::unique_ptr impl_; - explicit PositionDeleteWriter(std::unique_ptr impl); }; } // namespace iceberg diff --git a/src/iceberg/data/writer.h b/src/iceberg/data/writer.h index 371f123df..82c1d0cdc 100644 --- a/src/iceberg/data/writer.h +++ b/src/iceberg/data/writer.h @@ -28,66 +28,36 @@ #include "iceberg/arrow_c_data.h" #include "iceberg/iceberg_export.h" -#include "iceberg/manifest/manifest_entry.h" #include "iceberg/result.h" +#include "iceberg/type_fwd.h" namespace iceberg { /// \brief Base interface for data file writers. -/// -/// This interface defines the common operations for writing Iceberg data files, -/// including data files, equality delete files, and position delete files. -/// -/// Typical usage: -/// 1. Create a writer instance (via concrete implementation) -/// 2. Call Write() one or more times to write data -/// 3. Call Close() to finalize the file -/// 4. Call Metadata() to get file metadata (only valid after Close()) class ICEBERG_EXPORT FileWriter { public: virtual ~FileWriter(); /// \brief Write a batch of records. - /// - /// \param data Arrow array containing the records to write. - /// \return Status indicating success or failure. + /// \note The ownership of the ArrowArray will be transferred to the writer. virtual Status Write(ArrowArray* data) = 0; /// \brief Get the current number of bytes written. - /// - /// \return Result containing the number of bytes written or an error. virtual Result Length() const = 0; /// \brief Close the writer and finalize the file. - /// - /// \return Status indicating success or failure. virtual Status Close() = 0; - /// \brief File metadata for all files produced by the writer. - /// - /// \note The following features from Java are not yet supported: - /// - Encryption key metadata (EncryptionKeyMetadata) - /// - Split offsets for data files - /// - Referenced data files tracking for position deletes + /// \brief File metadata for all files produced by this writer. struct ICEBERG_EXPORT WriteResult { - /// Data files produced by this writer. - /// - /// \note When multiple files are produced: - /// - Position delete writers may produce multiple file-scoped delete files (one per - /// referenced data file) to avoid large manifests - /// - Future feature: File rolling will produce multiple files when size/record - /// thresholds - /// are exceeded - /// - Current implementation: Each writer produces exactly one file + /// Usually a writer produces a single data or delete file. + /// Position delete writer may produce multiple file-scoped delete files. + /// In the future, multiple files can be produced if file rolling is supported. std::vector> data_files; }; /// \brief Get file metadata for all files produced by this writer. - /// - /// This method should be called after Close() to retrieve the metadata - /// for all files written by this writer. - /// - /// \return Result containing the write result or an error. + /// \note This method should be called after Close(). virtual Result Metadata() = 0; }; diff --git a/src/iceberg/test/data_writer_test.cc b/src/iceberg/test/data_writer_test.cc index 8ba3ab8dc..9379becb9 100644 --- a/src/iceberg/test/data_writer_test.cc +++ b/src/iceberg/test/data_writer_test.cc @@ -17,915 +17,7 @@ * under the License. */ -#include "iceberg/data/data_writer.h" - -#include -#include - #include #include -#include "iceberg/arrow_c_data.h" -#include "iceberg/data/equality_delete_writer.h" -#include "iceberg/data/file_writer_factory.h" -#include "iceberg/data/position_delete_writer.h" -#include "iceberg/data/writer.h" -#include "iceberg/file_format.h" -#include "iceberg/file_io.h" -#include "iceberg/manifest/manifest_entry.h" -#include "iceberg/partition_spec.h" -#include "iceberg/result.h" -#include "iceberg/row/partition_values.h" -#include "iceberg/schema.h" -#include "iceberg/test/matchers.h" -#include "iceberg/transform.h" - -namespace iceberg { - -// Mock implementation of FileWriter for testing -class MockFileWriter : public FileWriter { - public: - MockFileWriter() = default; - - Status Write(ArrowArray* data) override { - if (is_closed_) { - return Invalid("Writer is closed"); - } - if (data == nullptr) { - return Invalid("Null data provided"); - } - write_count_++; - // Simulate writing some bytes - bytes_written_ += 1024; - return {}; - } - - Result Length() const override { return bytes_written_; } - - Status Close() override { - if (is_closed_) { - return Invalid("Writer already closed"); - } - is_closed_ = true; - return {}; - } - - Result Metadata() override { - if (!is_closed_) { - return Invalid("Writer must be closed before getting metadata"); - } - - WriteResult result; - auto data_file = std::make_shared(); - data_file->file_path = "/test/data/file.parquet"; - data_file->file_format = FileFormatType::kParquet; - data_file->record_count = write_count_ * 100; - data_file->file_size_in_bytes = bytes_written_; - result.data_files.push_back(data_file); - - return result; - } - - bool is_closed() const { return is_closed_; } - int32_t write_count() const { return write_count_; } - - private: - int64_t bytes_written_ = 0; - bool is_closed_ = false; - int32_t write_count_ = 0; -}; - -TEST(FileWriterTest, BasicWriteOperation) { - MockFileWriter writer; - - // Create a dummy ArrowArray (normally this would contain actual data) - ArrowArray dummy_array = {}; - - ASSERT_THAT(writer.Write(&dummy_array), IsOk()); - ASSERT_EQ(writer.write_count(), 1); - - auto length_result = writer.Length(); - ASSERT_THAT(length_result, IsOk()); - ASSERT_EQ(*length_result, 1024); -} - -TEST(FileWriterTest, MultipleWrites) { - MockFileWriter writer; - ArrowArray dummy_array = {}; - - // Write multiple times - for (int i = 0; i < 5; i++) { - ASSERT_THAT(writer.Write(&dummy_array), IsOk()); - } - - ASSERT_EQ(writer.write_count(), 5); - - auto length_result = writer.Length(); - ASSERT_THAT(length_result, IsOk()); - ASSERT_EQ(*length_result, 5120); // 5 * 1024 -} - -TEST(FileWriterTest, WriteNullData) { - MockFileWriter writer; - - auto status = writer.Write(nullptr); - ASSERT_THAT(status, HasErrorMessage("Null data provided")); -} - -TEST(FileWriterTest, CloseWriter) { - MockFileWriter writer; - ArrowArray dummy_array = {}; - - ASSERT_THAT(writer.Write(&dummy_array), IsOk()); - ASSERT_FALSE(writer.is_closed()); - - ASSERT_THAT(writer.Close(), IsOk()); - ASSERT_TRUE(writer.is_closed()); -} - -TEST(FileWriterTest, DoubleClose) { - MockFileWriter writer; - - ASSERT_THAT(writer.Close(), IsOk()); - auto status = writer.Close(); - ASSERT_THAT(status, HasErrorMessage("Writer already closed")); -} - -TEST(FileWriterTest, WriteAfterClose) { - MockFileWriter writer; - ArrowArray dummy_array = {}; - - ASSERT_THAT(writer.Close(), IsOk()); - - auto status = writer.Write(&dummy_array); - ASSERT_THAT(status, HasErrorMessage("Writer is closed")); -} - -TEST(FileWriterTest, MetadataBeforeClose) { - MockFileWriter writer; - ArrowArray dummy_array = {}; - - ASSERT_THAT(writer.Write(&dummy_array), IsOk()); - - auto metadata_result = writer.Metadata(); - ASSERT_THAT(metadata_result, - HasErrorMessage("Writer must be closed before getting metadata")); -} - -TEST(FileWriterTest, MetadataAfterClose) { - MockFileWriter writer; - ArrowArray dummy_array = {}; - - // Write some data - ASSERT_THAT(writer.Write(&dummy_array), IsOk()); - ASSERT_THAT(writer.Write(&dummy_array), IsOk()); - ASSERT_THAT(writer.Write(&dummy_array), IsOk()); - - // Close the writer - ASSERT_THAT(writer.Close(), IsOk()); - - // Get metadata - auto metadata_result = writer.Metadata(); - ASSERT_THAT(metadata_result, IsOk()); - - const auto& result = *metadata_result; - ASSERT_EQ(result.data_files.size(), 1); - - const auto& data_file = result.data_files[0]; - ASSERT_EQ(data_file->file_path, "/test/data/file.parquet"); - ASSERT_EQ(data_file->file_format, FileFormatType::kParquet); - ASSERT_EQ(data_file->record_count, 300); // 3 writes * 100 records - ASSERT_EQ(data_file->file_size_in_bytes, 3072); // 3 * 1024 -} - -TEST(FileWriterTest, WriteResultStructure) { - FileWriter::WriteResult result; - - // Test that WriteResult can hold multiple data files - auto data_file1 = std::make_shared(); - data_file1->file_path = "/test/data/file1.parquet"; - data_file1->record_count = 100; - - auto data_file2 = std::make_shared(); - data_file2->file_path = "/test/data/file2.parquet"; - data_file2->record_count = 200; - - result.data_files.push_back(data_file1); - result.data_files.push_back(data_file2); - - ASSERT_EQ(result.data_files.size(), 2); - ASSERT_EQ(result.data_files[0]->file_path, "/test/data/file1.parquet"); - ASSERT_EQ(result.data_files[0]->record_count, 100); - ASSERT_EQ(result.data_files[1]->file_path, "/test/data/file2.parquet"); - ASSERT_EQ(result.data_files[1]->record_count, 200); -} - -TEST(FileWriterTest, EmptyWriteResult) { - FileWriter::WriteResult result; - ASSERT_EQ(result.data_files.size(), 0); - ASSERT_TRUE(result.data_files.empty()); -} - -// Tests for stub implementations (methods return NotImplemented) -class WriterStubTest : public ::testing::Test { - protected: - void SetUp() override { - schema_ = std::make_shared( - std::vector{SchemaField::MakeRequired(1, "id", int32()), - SchemaField::MakeRequired(2, "data", string())}); - - ICEBERG_UNWRAP_OR_FAIL(spec_, PartitionSpec::Make(0, {})); - io_ = nullptr; // Not needed for stub tests - } - - std::shared_ptr schema_; - std::shared_ptr spec_; - std::shared_ptr io_; -}; - -TEST_F(WriterStubTest, DataWriterMethodsReturnNotImplemented) { - FileWriterFactory factory(schema_, spec_, io_); - PartitionValues partition; - - auto writer_result = factory.NewDataWriter("/test/data/file.parquet", - FileFormatType::kParquet, partition); - ASSERT_THAT(writer_result, IsOk()); - - auto& writer = *writer_result; - ArrowArray dummy_array = {}; - - // All methods should return NotImplemented - ASSERT_THAT(writer->Write(&dummy_array), - HasErrorMessage("DataWriter not yet implemented")); - ASSERT_THAT(writer->Length(), HasErrorMessage("DataWriter not yet implemented")); - ASSERT_THAT(writer->Close(), HasErrorMessage("DataWriter not yet implemented")); - ASSERT_THAT(writer->Metadata(), HasErrorMessage("DataWriter not yet implemented")); -} - -TEST_F(WriterStubTest, PositionDeleteWriterMethodsReturnNotImplemented) { - FileWriterFactory factory(schema_, spec_, io_); - PartitionValues partition; - - auto writer_result = factory.NewPositionDeleteWriter( - "/test/deletes/pos_deletes.parquet", FileFormatType::kParquet, partition); - ASSERT_THAT(writer_result, IsOk()); - - auto& writer = *writer_result; - ArrowArray dummy_array = {}; - - // All methods should return NotImplemented - ASSERT_THAT(writer->Write(&dummy_array), - HasErrorMessage("PositionDeleteWriter not yet implemented")); - ASSERT_THAT(writer->WriteDelete("/test/file.parquet", 0), - HasErrorMessage("PositionDeleteWriter not yet implemented")); - ASSERT_THAT(writer->Length(), - HasErrorMessage("PositionDeleteWriter not yet implemented")); - ASSERT_THAT(writer->Close(), - HasErrorMessage("PositionDeleteWriter not yet implemented")); - ASSERT_THAT(writer->Metadata(), - HasErrorMessage("PositionDeleteWriter not yet implemented")); -} - -TEST_F(WriterStubTest, EqualityDeleteWriterMethodsReturnNotImplemented) { - FileWriterFactory factory(schema_, spec_, io_); - factory.SetEqualityDeleteConfig(schema_, {1, 2}); - - PartitionValues partition; - auto writer_result = factory.NewEqualityDeleteWriter( - "/test/deletes/eq_deletes.parquet", FileFormatType::kParquet, partition); - ASSERT_THAT(writer_result, IsOk()); - - auto& writer = *writer_result; - ArrowArray dummy_array = {}; - - // All methods should return NotImplemented - ASSERT_THAT(writer->Write(&dummy_array), - HasErrorMessage("EqualityDeleteWriter not yet implemented")); - ASSERT_THAT(writer->Length(), - HasErrorMessage("EqualityDeleteWriter not yet implemented")); - ASSERT_THAT(writer->Close(), - HasErrorMessage("EqualityDeleteWriter not yet implemented")); - ASSERT_THAT(writer->Metadata(), - HasErrorMessage("EqualityDeleteWriter not yet implemented")); - - // equality_field_ids should return the configured value - ASSERT_EQ(writer->equality_field_ids(), std::vector({1, 2})); -} - -// Tests for FileWriterFactory -class FileWriterFactoryTest : public ::testing::Test { - protected: - void SetUp() override { - schema_ = std::make_shared( - std::vector{SchemaField::MakeRequired(1, "id", int32()), - SchemaField::MakeRequired(2, "data", string()), - SchemaField::MakeRequired(3, "category", string())}); - - ICEBERG_UNWRAP_OR_FAIL( - spec_, PartitionSpec::Make( - 0, {PartitionField(3, 1000, "category", Transform::Identity())})); - io_ = nullptr; // FileIO not needed for stub tests - } - - std::shared_ptr schema_; - std::shared_ptr spec_; - std::shared_ptr io_; -}; - -TEST_F(FileWriterFactoryTest, ConstructorWithNullProperties) { - FileWriterFactory factory(schema_, spec_, io_, nullptr); - // Should construct successfully without properties -} - -TEST_F(FileWriterFactoryTest, NewDataWriterCreatesWriter) { - FileWriterFactory factory(schema_, spec_, io_); - - PartitionValues partition; - auto result = factory.NewDataWriter("/test/data/file.parquet", FileFormatType::kParquet, - partition); - - // Factory should successfully create a writer - ASSERT_THAT(result, IsOk()); -} - -TEST_F(FileWriterFactoryTest, NewDataWriterWithSortOrder) { - FileWriterFactory factory(schema_, spec_, io_); - - PartitionValues partition; - auto result = factory.NewDataWriter("/test/data/file.parquet", FileFormatType::kParquet, - partition, - /*sort_order_id=*/1); - - // Factory should successfully create a writer - ASSERT_THAT(result, IsOk()); -} - -TEST_F(FileWriterFactoryTest, NewPositionDeleteWriterCreatesWriter) { - FileWriterFactory factory(schema_, spec_, io_); - - PartitionValues partition; - auto result = factory.NewPositionDeleteWriter("/test/deletes/pos_deletes.parquet", - FileFormatType::kParquet, partition); - - ASSERT_THAT(result, IsOk()); -} - -TEST_F(FileWriterFactoryTest, NewPositionDeleteWriterWithRowSchema) { - FileWriterFactory factory(schema_, spec_, io_); - - auto row_schema = std::make_shared( - std::vector{SchemaField::MakeRequired(1, "id", int32())}); - factory.SetPositionDeleteRowSchema(row_schema); - - PartitionValues partition; - auto result = factory.NewPositionDeleteWriter("/test/deletes/pos_deletes.parquet", - FileFormatType::kParquet, partition); - - ASSERT_THAT(result, IsOk()); -} - -TEST_F(FileWriterFactoryTest, NewEqualityDeleteWriterCreatesWriter) { - FileWriterFactory factory(schema_, spec_, io_); - factory.SetEqualityDeleteConfig(schema_, {1, 2}); // Must set config first - - PartitionValues partition; - auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq_deletes.parquet", - FileFormatType::kParquet, partition); - - ASSERT_THAT(result, IsOk()); -} - -TEST_F(FileWriterFactoryTest, NewEqualityDeleteWriterWithConfig) { - FileWriterFactory factory(schema_, spec_, io_); - - auto eq_delete_schema = std::make_shared( - std::vector{SchemaField::MakeRequired(1, "id", int32()), - SchemaField::MakeRequired(2, "data", string())}); - std::vector equality_field_ids = {1, 2}; - factory.SetEqualityDeleteConfig(eq_delete_schema, equality_field_ids); - - PartitionValues partition; - auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq_deletes.parquet", - FileFormatType::kParquet, partition); - - ASSERT_THAT(result, IsOk()); -} - -TEST_F(FileWriterFactoryTest, NewEqualityDeleteWriterWithSortOrder) { - FileWriterFactory factory(schema_, spec_, io_); - factory.SetEqualityDeleteConfig(schema_, {1, 2}); // Must set config first - - PartitionValues partition; - auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq_deletes.parquet", - FileFormatType::kParquet, partition, - /*sort_order_id=*/1); - - ASSERT_THAT(result, IsOk()); -} - -TEST_F(FileWriterFactoryTest, NewEqualityDeleteWriterUsesDefaultSchema) { - FileWriterFactory factory(schema_, spec_, io_); - factory.SetEqualityDeleteConfig(nullptr, {1, 2}); // Set field IDs but no custom schema - - // Don't set custom equality delete schema - should use default schema - PartitionValues partition; - auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq_deletes.parquet", - FileFormatType::kParquet, partition); - - ASSERT_THAT(result, IsOk()); -} - -TEST_F(FileWriterFactoryTest, NewEqualityDeleteWriterUsesCustomSchema) { - FileWriterFactory factory(schema_, spec_, io_); - - auto custom_schema = std::make_shared( - std::vector{SchemaField::MakeRequired(1, "id", int32())}); - std::vector equality_field_ids = {1}; - factory.SetEqualityDeleteConfig(custom_schema, equality_field_ids); - - PartitionValues partition; - auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq_deletes.parquet", - FileFormatType::kParquet, partition); - - ASSERT_THAT(result, IsOk()); -} - -// Tests for input validation -class FileWriterFactoryInputValidationTest : public ::testing::Test { - protected: - void SetUp() override { - schema_ = std::make_shared( - std::vector{SchemaField::MakeRequired(1, "id", int32()), - SchemaField::MakeRequired(2, "data", string())}); - ICEBERG_UNWRAP_OR_FAIL(spec_, PartitionSpec::Make(0, {})); - io_ = nullptr; - } - - std::shared_ptr schema_; - std::shared_ptr spec_; - std::shared_ptr io_; -}; - -TEST_F(FileWriterFactoryInputValidationTest, NewDataWriterRejectsEmptyPath) { - FileWriterFactory factory(schema_, spec_, io_); - PartitionValues partition; - - auto result = factory.NewDataWriter("", FileFormatType::kParquet, partition); - - ASSERT_THAT(result, HasErrorMessage("Path cannot be empty")); -} - -TEST_F(FileWriterFactoryInputValidationTest, NewDataWriterRejectsNullSchema) { - FileWriterFactory factory(nullptr, spec_, io_); - PartitionValues partition; - - auto result = factory.NewDataWriter("/test/data/file.parquet", FileFormatType::kParquet, - partition); - - ASSERT_THAT(result, HasErrorMessage("Schema cannot be null")); -} - -TEST_F(FileWriterFactoryInputValidationTest, NewDataWriterRejectsNullSpec) { - FileWriterFactory factory(schema_, nullptr, io_); - PartitionValues partition; - - auto result = factory.NewDataWriter("/test/data/file.parquet", FileFormatType::kParquet, - partition); - - ASSERT_THAT(result, HasErrorMessage("PartitionSpec cannot be null")); -} - -TEST_F(FileWriterFactoryInputValidationTest, NewPositionDeleteWriterRejectsEmptyPath) { - FileWriterFactory factory(schema_, spec_, io_); - PartitionValues partition; - - auto result = factory.NewPositionDeleteWriter("", FileFormatType::kParquet, partition); - - ASSERT_THAT(result, HasErrorMessage("Path cannot be empty")); -} - -TEST_F(FileWriterFactoryInputValidationTest, NewPositionDeleteWriterRejectsNullSchema) { - FileWriterFactory factory(nullptr, spec_, io_); - PartitionValues partition; - - auto result = factory.NewPositionDeleteWriter("/test/deletes/pos.parquet", - FileFormatType::kParquet, partition); - - ASSERT_THAT(result, HasErrorMessage("Schema cannot be null")); -} - -TEST_F(FileWriterFactoryInputValidationTest, NewPositionDeleteWriterRejectsNullSpec) { - FileWriterFactory factory(schema_, nullptr, io_); - PartitionValues partition; - - auto result = factory.NewPositionDeleteWriter("/test/deletes/pos.parquet", - FileFormatType::kParquet, partition); - - ASSERT_THAT(result, HasErrorMessage("PartitionSpec cannot be null")); -} - -TEST_F(FileWriterFactoryInputValidationTest, NewEqualityDeleteWriterRejectsEmptyPath) { - FileWriterFactory factory(schema_, spec_, io_); - factory.SetEqualityDeleteConfig(schema_, {1, 2}); - PartitionValues partition; - - auto result = factory.NewEqualityDeleteWriter("", FileFormatType::kParquet, partition); - - ASSERT_THAT(result, HasErrorMessage("Path cannot be empty")); -} - -TEST_F(FileWriterFactoryInputValidationTest, NewEqualityDeleteWriterRejectsNullSchema) { - FileWriterFactory factory(nullptr, spec_, io_); - factory.SetEqualityDeleteConfig(schema_, {1, 2}); - PartitionValues partition; - - auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", - FileFormatType::kParquet, partition); - - ASSERT_THAT(result, HasErrorMessage("Schema cannot be null")); -} - -TEST_F(FileWriterFactoryInputValidationTest, NewEqualityDeleteWriterRejectsNullSpec) { - FileWriterFactory factory(schema_, nullptr, io_); - factory.SetEqualityDeleteConfig(schema_, {1, 2}); - PartitionValues partition; - - auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", - FileFormatType::kParquet, partition); - - ASSERT_THAT(result, HasErrorMessage("PartitionSpec cannot be null")); -} - -TEST_F(FileWriterFactoryInputValidationTest, - NewEqualityDeleteWriterRejectsEmptyFieldIds) { - FileWriterFactory factory(schema_, spec_, io_); - // Don't set equality config, so field IDs will be empty - PartitionValues partition; - - auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", - FileFormatType::kParquet, partition); - - ASSERT_THAT(result, HasErrorMessage("Equality field IDs cannot be empty")); -} - -// Tests for state management in stub writers -class WriterStubStateManagementTest : public ::testing::Test { - protected: - void SetUp() override { - schema_ = std::make_shared( - std::vector{SchemaField::MakeRequired(1, "id", int32()), - SchemaField::MakeRequired(2, "data", string())}); - ICEBERG_UNWRAP_OR_FAIL(spec_, PartitionSpec::Make(0, {})); - io_ = nullptr; - } - - std::shared_ptr schema_; - std::shared_ptr spec_; - std::shared_ptr io_; -}; - -TEST_F(WriterStubStateManagementTest, DataWriterRejectsWriteAfterClose) { - FileWriterFactory factory(schema_, spec_, io_); - PartitionValues partition; - - auto writer_result = factory.NewDataWriter("/test/data/file.parquet", - FileFormatType::kParquet, partition); - ASSERT_THAT(writer_result, IsOk()); - auto& writer = *writer_result; - - // Close the writer - ASSERT_THAT(writer->Close(), HasErrorMessage("DataWriter not yet implemented")); - - // Now try to write - should fail because writer is closed - ArrowArray dummy_array = {}; - auto status = writer->Write(&dummy_array); - ASSERT_THAT(status, HasErrorMessage("Writer is already closed")); -} - -TEST_F(WriterStubStateManagementTest, DataWriterCloseIsIdempotent) { - FileWriterFactory factory(schema_, spec_, io_); - PartitionValues partition; - - auto writer_result = factory.NewDataWriter("/test/data/file.parquet", - FileFormatType::kParquet, partition); - ASSERT_THAT(writer_result, IsOk()); - auto& writer = *writer_result; - - // Close multiple times - second close should succeed (idempotent) - ASSERT_THAT(writer->Close(), HasErrorMessage("DataWriter not yet implemented")); - ASSERT_THAT(writer->Close(), IsOk()); - ASSERT_THAT(writer->Close(), IsOk()); -} - -TEST_F(WriterStubStateManagementTest, DataWriterRejectsNullData) { - FileWriterFactory factory(schema_, spec_, io_); - PartitionValues partition; - - auto writer_result = factory.NewDataWriter("/test/data/file.parquet", - FileFormatType::kParquet, partition); - ASSERT_THAT(writer_result, IsOk()); - auto& writer = *writer_result; - - auto status = writer->Write(nullptr); - ASSERT_THAT(status, HasErrorMessage("Cannot write null data")); -} - -TEST_F(WriterStubStateManagementTest, PositionDeleteWriterRejectsWriteAfterClose) { - FileWriterFactory factory(schema_, spec_, io_); - PartitionValues partition; - - auto writer_result = factory.NewPositionDeleteWriter( - "/test/deletes/pos.parquet", FileFormatType::kParquet, partition); - ASSERT_THAT(writer_result, IsOk()); - auto& writer = *writer_result; - - // Close the writer - ASSERT_THAT(writer->Close(), - HasErrorMessage("PositionDeleteWriter not yet implemented")); - - // Try to write - should fail - ArrowArray dummy_array = {}; - auto status = writer->Write(&dummy_array); - ASSERT_THAT(status, HasErrorMessage("Writer is already closed")); -} - -TEST_F(WriterStubStateManagementTest, PositionDeleteWriterRejectsWriteDeleteAfterClose) { - FileWriterFactory factory(schema_, spec_, io_); - PartitionValues partition; - - auto writer_result = factory.NewPositionDeleteWriter( - "/test/deletes/pos.parquet", FileFormatType::kParquet, partition); - ASSERT_THAT(writer_result, IsOk()); - auto& writer = *writer_result; - - // Close the writer - ASSERT_THAT(writer->Close(), - HasErrorMessage("PositionDeleteWriter not yet implemented")); - - // Try WriteDelete - should fail - auto status = writer->WriteDelete("/test/file.parquet", 100); - ASSERT_THAT(status, HasErrorMessage("Writer is already closed")); -} - -TEST_F(WriterStubStateManagementTest, PositionDeleteWriterRejectsNullData) { - FileWriterFactory factory(schema_, spec_, io_); - PartitionValues partition; - - auto writer_result = factory.NewPositionDeleteWriter( - "/test/deletes/pos.parquet", FileFormatType::kParquet, partition); - ASSERT_THAT(writer_result, IsOk()); - auto& writer = *writer_result; - - auto status = writer->Write(nullptr); - ASSERT_THAT(status, HasErrorMessage("Cannot write null data")); -} - -TEST_F(WriterStubStateManagementTest, PositionDeleteWriterRejectsEmptyFilePath) { - FileWriterFactory factory(schema_, spec_, io_); - PartitionValues partition; - - auto writer_result = factory.NewPositionDeleteWriter( - "/test/deletes/pos.parquet", FileFormatType::kParquet, partition); - ASSERT_THAT(writer_result, IsOk()); - auto& writer = *writer_result; - - auto status = writer->WriteDelete("", 100); - ASSERT_THAT(status, HasErrorMessage("File path cannot be empty")); -} - -TEST_F(WriterStubStateManagementTest, EqualityDeleteWriterRejectsWriteAfterClose) { - FileWriterFactory factory(schema_, spec_, io_); - factory.SetEqualityDeleteConfig(schema_, {1, 2}); - PartitionValues partition; - - auto writer_result = factory.NewEqualityDeleteWriter( - "/test/deletes/eq.parquet", FileFormatType::kParquet, partition); - ASSERT_THAT(writer_result, IsOk()); - auto& writer = *writer_result; - - // Close the writer - ASSERT_THAT(writer->Close(), - HasErrorMessage("EqualityDeleteWriter not yet implemented")); - - // Try to write - should fail - ArrowArray dummy_array = {}; - auto status = writer->Write(&dummy_array); - ASSERT_THAT(status, HasErrorMessage("Writer is already closed")); -} - -TEST_F(WriterStubStateManagementTest, EqualityDeleteWriterRejectsNullData) { - FileWriterFactory factory(schema_, spec_, io_); - factory.SetEqualityDeleteConfig(schema_, {1, 2}); - PartitionValues partition; - - auto writer_result = factory.NewEqualityDeleteWriter( - "/test/deletes/eq.parquet", FileFormatType::kParquet, partition); - ASSERT_THAT(writer_result, IsOk()); - auto& writer = *writer_result; - - auto status = writer->Write(nullptr); - ASSERT_THAT(status, HasErrorMessage("Cannot write null data")); -} - -// Tests for different file formats -class FileWriterFactoryFileFormatTest : public ::testing::Test { - protected: - void SetUp() override { - schema_ = std::make_shared( - std::vector{SchemaField::MakeRequired(1, "id", int32())}); - ICEBERG_UNWRAP_OR_FAIL(spec_, PartitionSpec::Make(0, {})); - io_ = nullptr; - } - - std::shared_ptr schema_; - std::shared_ptr spec_; - std::shared_ptr io_; -}; - -TEST_F(FileWriterFactoryFileFormatTest, DataWriterSupportsParquet) { - FileWriterFactory factory(schema_, spec_, io_); - PartitionValues partition; - - auto result = factory.NewDataWriter("/test/data/file.parquet", FileFormatType::kParquet, - partition); - ASSERT_THAT(result, IsOk()); -} - -TEST_F(FileWriterFactoryFileFormatTest, DataWriterSupportsAvro) { - FileWriterFactory factory(schema_, spec_, io_); - PartitionValues partition; - - auto result = - factory.NewDataWriter("/test/data/file.avro", FileFormatType::kAvro, partition); - ASSERT_THAT(result, IsOk()); -} - -TEST_F(FileWriterFactoryFileFormatTest, PositionDeleteWriterSupportsParquet) { - FileWriterFactory factory(schema_, spec_, io_); - PartitionValues partition; - - auto result = factory.NewPositionDeleteWriter("/test/deletes/pos.parquet", - FileFormatType::kParquet, partition); - ASSERT_THAT(result, IsOk()); -} - -TEST_F(FileWriterFactoryFileFormatTest, PositionDeleteWriterSupportsAvro) { - FileWriterFactory factory(schema_, spec_, io_); - PartitionValues partition; - - auto result = factory.NewPositionDeleteWriter("/test/deletes/pos.avro", - FileFormatType::kAvro, partition); - ASSERT_THAT(result, IsOk()); -} - -TEST_F(FileWriterFactoryFileFormatTest, EqualityDeleteWriterSupportsParquet) { - FileWriterFactory factory(schema_, spec_, io_); - factory.SetEqualityDeleteConfig(schema_, {1}); - PartitionValues partition; - - auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", - FileFormatType::kParquet, partition); - ASSERT_THAT(result, IsOk()); -} - -TEST_F(FileWriterFactoryFileFormatTest, EqualityDeleteWriterSupportsAvro) { - FileWriterFactory factory(schema_, spec_, io_); - factory.SetEqualityDeleteConfig(schema_, {1}); - PartitionValues partition; - - auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.avro", - FileFormatType::kAvro, partition); - ASSERT_THAT(result, IsOk()); -} - -// Edge case tests -class WriterEdgeCaseTest : public ::testing::Test { - protected: - void SetUp() override { - schema_ = std::make_shared( - std::vector{SchemaField::MakeRequired(1, "id", int32()), - SchemaField::MakeRequired(2, "data", string())}); - ICEBERG_UNWRAP_OR_FAIL(spec_, PartitionSpec::Make(0, {})); - io_ = nullptr; - } - - std::shared_ptr schema_; - std::shared_ptr spec_; - std::shared_ptr io_; -}; - -TEST_F(WriterEdgeCaseTest, EqualityDeleteWriterWithSingleFieldId) { - FileWriterFactory factory(schema_, spec_, io_); - factory.SetEqualityDeleteConfig(schema_, {1}); // Single field ID - - PartitionValues partition; - auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", - FileFormatType::kParquet, partition); - ASSERT_THAT(result, IsOk()); - - auto& writer = *result; - ASSERT_EQ(writer->equality_field_ids(), std::vector({1})); -} - -TEST_F(WriterEdgeCaseTest, EqualityDeleteWriterWithMultipleFieldIds) { - FileWriterFactory factory(schema_, spec_, io_); - factory.SetEqualityDeleteConfig(schema_, {1, 2}); // Multiple field IDs - - PartitionValues partition; - auto result = factory.NewEqualityDeleteWriter("/test/deletes/eq.parquet", - FileFormatType::kParquet, partition); - ASSERT_THAT(result, IsOk()); - - auto& writer = *result; - ASSERT_EQ(writer->equality_field_ids(), std::vector({1, 2})); -} - -TEST_F(WriterEdgeCaseTest, MultipleWritersFromSameFactory) { - FileWriterFactory factory(schema_, spec_, io_); - factory.SetEqualityDeleteConfig(schema_, {1, 2}); - - PartitionValues partition; - - // Create multiple writers from the same factory - auto data_writer1 = - factory.NewDataWriter("/test/data1.parquet", FileFormatType::kParquet, partition); - auto data_writer2 = - factory.NewDataWriter("/test/data2.parquet", FileFormatType::kParquet, partition); - auto pos_writer = factory.NewPositionDeleteWriter("/test/pos.parquet", - FileFormatType::kParquet, partition); - auto eq_writer = factory.NewEqualityDeleteWriter("/test/eq.parquet", - FileFormatType::kParquet, partition); - - ASSERT_THAT(data_writer1, IsOk()); - ASSERT_THAT(data_writer2, IsOk()); - ASSERT_THAT(pos_writer, IsOk()); - ASSERT_THAT(eq_writer, IsOk()); -} - -TEST_F(WriterEdgeCaseTest, FactoryWithPartitionedSpec) { - auto partitioned_schema = std::make_shared( - std::vector{SchemaField::MakeRequired(1, "id", int32()), - SchemaField::MakeRequired(2, "category", string()), - SchemaField::MakeRequired(3, "data", string())}); - - auto partitioned_spec_result = PartitionSpec::Make( - 0, {PartitionField(2, 1000, "category", Transform::Identity())}); - ASSERT_THAT(partitioned_spec_result, IsOk()); - auto partitioned_spec = - std::shared_ptr(std::move(*partitioned_spec_result)); - - FileWriterFactory factory(partitioned_schema, partitioned_spec, io_); - - PartitionValues partition; - partition.AddValue(Literal::String("electronics")); - - auto result = factory.NewDataWriter("/test/data/category=electronics/file.parquet", - FileFormatType::kParquet, partition); - ASSERT_THAT(result, IsOk()); -} - -TEST_F(WriterEdgeCaseTest, ReconfigureEqualityDeleteConfig) { - FileWriterFactory factory(schema_, spec_, io_); - - // Set initial config - factory.SetEqualityDeleteConfig(schema_, {1}); - - PartitionValues partition; - auto writer1 = factory.NewEqualityDeleteWriter("/test/eq1.parquet", - FileFormatType::kParquet, partition); - ASSERT_THAT(writer1, IsOk()); - ASSERT_EQ((*writer1)->equality_field_ids(), std::vector({1})); - - // Reconfigure with different field IDs - factory.SetEqualityDeleteConfig(schema_, {1, 2}); - - auto writer2 = factory.NewEqualityDeleteWriter("/test/eq2.parquet", - FileFormatType::kParquet, partition); - ASSERT_THAT(writer2, IsOk()); - ASSERT_EQ((*writer2)->equality_field_ids(), std::vector({1, 2})); -} - -TEST_F(WriterEdgeCaseTest, ReconfigurePositionDeleteRowSchema) { - FileWriterFactory factory(schema_, spec_, io_); - - auto row_schema1 = std::make_shared( - std::vector{SchemaField::MakeRequired(1, "id", int32())}); - factory.SetPositionDeleteRowSchema(row_schema1); - - PartitionValues partition; - auto writer1 = factory.NewPositionDeleteWriter("/test/pos1.parquet", - FileFormatType::kParquet, partition); - ASSERT_THAT(writer1, IsOk()); - - // Reconfigure with different row schema - auto row_schema2 = std::make_shared( - std::vector{SchemaField::MakeRequired(1, "id", int32()), - SchemaField::MakeRequired(2, "data", string())}); - factory.SetPositionDeleteRowSchema(row_schema2); - - auto writer2 = factory.NewPositionDeleteWriter("/test/pos2.parquet", - FileFormatType::kParquet, partition); - ASSERT_THAT(writer2, IsOk()); -} - -} // namespace iceberg +namespace iceberg {} // namespace iceberg