Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/iceberg/manifest/manifest_entry.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

#include "iceberg/file_format.h"
#include "iceberg/iceberg_export.h"
#include "iceberg/partition_spec.h"
#include "iceberg/result.h"
#include "iceberg/row/partition_values.h"
#include "iceberg/schema_field.h"
Expand Down Expand Up @@ -294,6 +293,11 @@ struct ICEBERG_EXPORT DataFile {

/// \brief Get the schema of the data file with the given partition type.
static std::shared_ptr<StructType> Type(std::shared_ptr<StructType> partition_type);

/// \brief Check if this data file is a deletion vector.
bool IsDeletionVector() const {
return content == Content::kPositionDeletes && file_format == FileFormatType::kPuffin;
}
};

/// \brief A manifest is an immutable Avro file that lists data files or delete files,
Expand Down
300 changes: 300 additions & 0 deletions src/iceberg/snapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,36 @@
#include "iceberg/snapshot.h"

#include <memory>
#include <sstream>
#include <utility>

#include "iceberg/file_io.h"
#include "iceberg/manifest/manifest_entry.h"
#include "iceberg/manifest/manifest_list.h"
#include "iceberg/manifest/manifest_reader.h"
#include "iceberg/util/macros.h"
#include "iceberg/util/string_util.h"

namespace iceberg {

namespace {

/// \brief Helper function to conditionally add a property to the summary
template <typename T>
void SetIf(bool condition, std::unordered_map<std::string, std::string>& builder,
const std::string& property, T value) {
if (condition) {
if constexpr (std::is_same_v<T, const char*> || std::is_same_v<T, std::string> ||
std::is_convertible_v<T, std::string_view>) {
builder[property] = value;
} else {
builder[property] = std::to_string(value);
}
}
}

} // namespace

bool SnapshotRef::Branch::Equals(const SnapshotRef::Branch& other) const {
return min_snapshots_to_keep == other.min_snapshots_to_keep &&
max_snapshot_age_ms == other.max_snapshot_age_ms &&
Expand Down Expand Up @@ -255,4 +276,283 @@
return std::span<ManifestFile>(cache.first.data() + delete_start, delete_count);
}

// SnapshotSummaryBuilder::UpdateMetrics implementation

void SnapshotSummaryBuilder::UpdateMetrics::Clear() {
added_size_ = 0;
removed_size_ = 0;
added_files_ = 0;
removed_files_ = 0;
added_eq_delete_files_ = 0;
removed_eq_delete_files_ = 0;
added_pos_delete_files_ = 0;
removed_pos_delete_files_ = 0;
added_delete_files_ = 0;
removed_delete_files_ = 0;
added_dvs_ = 0;
removed_dvs_ = 0;
added_records_ = 0;
deleted_records_ = 0;
added_pos_deletes_ = 0;
removed_pos_deletes_ = 0;
added_eq_deletes_ = 0;
removed_eq_deletes_ = 0;
trust_size_and_delete_counts_ = true;
}

void SnapshotSummaryBuilder::UpdateMetrics::AddTo(
std::unordered_map<std::string, std::string>& builder) const {
SetIf(added_files_ > 0, builder, SnapshotSummaryFields::kAddedDataFiles, added_files_);
SetIf(removed_files_ > 0, builder, SnapshotSummaryFields::kDeletedDataFiles,
removed_files_);
SetIf(added_eq_delete_files_ > 0, builder, SnapshotSummaryFields::kAddedEqDeleteFiles,
added_eq_delete_files_);
SetIf(removed_eq_delete_files_ > 0, builder,
SnapshotSummaryFields::kRemovedEqDeleteFiles, removed_eq_delete_files_);
SetIf(added_pos_delete_files_ > 0, builder, SnapshotSummaryFields::kAddedPosDeleteFiles,
added_pos_delete_files_);
SetIf(removed_pos_delete_files_ > 0, builder,
SnapshotSummaryFields::kRemovedPosDeleteFiles, removed_pos_delete_files_);
SetIf(added_delete_files_ > 0, builder, SnapshotSummaryFields::kAddedDeleteFiles,
added_delete_files_);
SetIf(removed_delete_files_ > 0, builder, SnapshotSummaryFields::kRemovedDeleteFiles,
removed_delete_files_);
SetIf(added_dvs_ > 0, builder, SnapshotSummaryFields::kAddedDVs, added_dvs_);
SetIf(removed_dvs_ > 0, builder, SnapshotSummaryFields::kRemovedDVs, removed_dvs_);
SetIf(added_records_ > 0, builder, SnapshotSummaryFields::kAddedRecords,
added_records_);
SetIf(deleted_records_ > 0, builder, SnapshotSummaryFields::kDeletedRecords,
deleted_records_);

if (trust_size_and_delete_counts_) {
SetIf(added_size_ > 0, builder, SnapshotSummaryFields::kAddedFileSize, added_size_);
SetIf(removed_size_ > 0, builder, SnapshotSummaryFields::kRemovedFileSize,
removed_size_);
SetIf(added_pos_deletes_ > 0, builder, SnapshotSummaryFields::kAddedPosDeletes,
added_pos_deletes_);
SetIf(removed_pos_deletes_ > 0, builder, SnapshotSummaryFields::kRemovedPosDeletes,
removed_pos_deletes_);
SetIf(added_eq_deletes_ > 0, builder, SnapshotSummaryFields::kAddedEqDeletes,
added_eq_deletes_);
SetIf(removed_eq_deletes_ > 0, builder, SnapshotSummaryFields::kRemovedEqDeletes,
removed_eq_deletes_);
}
}

void SnapshotSummaryBuilder::UpdateMetrics::AddedFile(const DataFile& file) {
added_size_ += file.file_size_in_bytes;

switch (file.content) {
case DataFile::Content::kData:
added_files_ += 1;
added_records_ += file.record_count;
break;
case DataFile::Content::kPositionDeletes:
if (file.IsDeletionVector()) {
added_dvs_ += 1;
} else {
added_pos_delete_files_ += 1;
}
added_delete_files_ += 1;
added_pos_deletes_ += file.record_count;
break;
case DataFile::Content::kEqualityDeletes:
added_delete_files_ += 1;
added_eq_delete_files_ += 1;
added_eq_deletes_ += file.record_count;
break;
default:
std::unreachable();
}
}

void SnapshotSummaryBuilder::UpdateMetrics::RemovedFile(const DataFile& file) {
removed_size_ += file.file_size_in_bytes;

switch (file.content) {
case DataFile::Content::kData:
removed_files_ += 1;
deleted_records_ += file.record_count;
break;
case DataFile::Content::kPositionDeletes:
if (file.IsDeletionVector()) {
removed_dvs_ += 1;
} else {
removed_pos_delete_files_ += 1;
}
removed_delete_files_ += 1;
removed_pos_deletes_ += file.record_count;
break;
case DataFile::Content::kEqualityDeletes:
removed_delete_files_ += 1;
removed_eq_delete_files_ += 1;
removed_eq_deletes_ += file.record_count;
break;
default:
std::unreachable();
}
}

void SnapshotSummaryBuilder::UpdateMetrics::AddedManifest(const ManifestFile& manifest) {
switch (manifest.content) {
case ManifestContent::kData:
added_files_ += manifest.added_files_count.value_or(0);
added_records_ += manifest.added_rows_count.value_or(0);
removed_files_ += manifest.deleted_files_count.value_or(0);
deleted_records_ += manifest.deleted_rows_count.value_or(0);
break;
case ManifestContent::kDeletes:
added_delete_files_ += manifest.added_files_count.value_or(0);
removed_delete_files_ += manifest.deleted_files_count.value_or(0);
trust_size_and_delete_counts_ = false;
break;
default:
std::unreachable();
}
}

void SnapshotSummaryBuilder::UpdateMetrics::Merge(const UpdateMetrics& other) {
added_files_ += other.added_files_;
removed_files_ += other.removed_files_;
added_eq_delete_files_ += other.added_eq_delete_files_;
removed_eq_delete_files_ += other.removed_eq_delete_files_;
added_pos_delete_files_ += other.added_pos_delete_files_;
removed_pos_delete_files_ += other.removed_pos_delete_files_;
added_dvs_ += other.added_dvs_;
removed_dvs_ += other.removed_dvs_;
added_delete_files_ += other.added_delete_files_;
removed_delete_files_ += other.removed_delete_files_;
added_size_ += other.added_size_;
removed_size_ += other.removed_size_;
added_records_ += other.added_records_;
deleted_records_ += other.deleted_records_;
added_pos_deletes_ += other.added_pos_deletes_;
removed_pos_deletes_ += other.removed_pos_deletes_;
added_eq_deletes_ += other.added_eq_deletes_;
removed_eq_deletes_ += other.removed_eq_deletes_;
trust_size_and_delete_counts_ =
trust_size_and_delete_counts_ && other.trust_size_and_delete_counts_;
}

// SnapshotSummaryBuilder implementation

void SnapshotSummaryBuilder::Clear() {
partition_metrics_.clear();
metrics_.Clear();
deleted_duplicate_files_ = 0;
trust_partition_metrics_ = true;
}

void SnapshotSummaryBuilder::SetPartitionSummaryLimit(int32_t max) {
max_changed_partitions_for_summaries_ = max;
}

void SnapshotSummaryBuilder::IncrementDuplicateDeletes(int32_t increment) {
deleted_duplicate_files_ += increment;
}

Status SnapshotSummaryBuilder::AddedFile(const PartitionSpec& spec,
const DataFile& file) {
metrics_.AddedFile(file);
ICEBERG_RETURN_UNEXPECTED(UpdatePartitions(spec, file, true));
return {};
}

Status SnapshotSummaryBuilder::DeletedFile(const PartitionSpec& spec,
const DataFile& file) {
metrics_.RemovedFile(file);
ICEBERG_RETURN_UNEXPECTED(UpdatePartitions(spec, file, false));
return {};
}

void SnapshotSummaryBuilder::AddedManifest(const ManifestFile& manifest) {
trust_partition_metrics_ = false;
partition_metrics_.clear();
metrics_.AddedManifest(manifest);
}

void SnapshotSummaryBuilder::Set(const std::string& property, const std::string& value) {
properties_[property] = value;
}

void SnapshotSummaryBuilder::Merge(const SnapshotSummaryBuilder& other) {
for (const auto& [key, value] : other.properties_) {
properties_[key] = value;
}
metrics_.Merge(other.metrics_);

trust_partition_metrics_ = trust_partition_metrics_ && other.trust_partition_metrics_;
if (trust_partition_metrics_) {
for (const auto& [key, value] : other.partition_metrics_) {
partition_metrics_[key].Merge(value);
}
} else {
partition_metrics_.clear();
}

deleted_duplicate_files_ += other.deleted_duplicate_files_;
}

std::unordered_map<std::string, std::string> SnapshotSummaryBuilder::Build() const {
std::unordered_map<std::string, std::string> builder;

// Copy custom summary properties
builder.insert(properties_.begin(), properties_.end());

metrics_.AddTo(builder);

SetIf(deleted_duplicate_files_ > 0, builder,
SnapshotSummaryFields::kDeletedDuplicatedFiles, deleted_duplicate_files_);

SetIf(trust_partition_metrics_, builder,
SnapshotSummaryFields::kChangedPartitionCountProp, partition_metrics_.size());

// Add partition summaries if enabled
if (trust_partition_metrics_ && static_cast<int32_t>(partition_metrics_.size()) <=

Check warning on line 511 in src/iceberg/snapshot.cc

View workflow job for this annotation

GitHub Actions / cpp-linter

src/iceberg/snapshot.cc:511:35 [modernize-use-integer-sign-comparison]

comparison between 'signed' and 'unsigned' integers
max_changed_partitions_for_summaries_) {
SetIf(!partition_metrics_.empty(), builder,
SnapshotSummaryFields::kPartitionSummaryProp, "true");
for (const auto& [key, metrics] : partition_metrics_) {
if (!key.empty()) {
builder[SnapshotSummaryFields::kChangedPartitionPrefix + key] =
PartitionSummary(metrics);
}
}
}

return builder;
}

Status SnapshotSummaryBuilder::UpdatePartitions(const PartitionSpec& spec,
const DataFile& file, bool is_addition) {
if (trust_partition_metrics_) {
ICEBERG_ASSIGN_OR_RAISE(std::string partition_path,
spec.PartitionPath(file.partition));
auto& part_metrics = partition_metrics_[partition_path];
if (is_addition) {
part_metrics.AddedFile(file);
} else {
part_metrics.RemovedFile(file);
}
}
return {};
}

std::string SnapshotSummaryBuilder::PartitionSummary(const UpdateMetrics& metrics) const {
std::unordered_map<std::string, std::string> part_builder;
metrics.AddTo(part_builder);

// Format as comma-separated key=value pairs
std::ostringstream oss;
bool first = true;
for (const auto& [key, value] : part_builder) {
if (!first) {
oss << ",";
}
oss << key << "=" << value;
first = false;
}
return oss.str();
}

} // namespace iceberg
Loading
Loading