Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/include/metadata/iceberg_snapshot.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,28 @@ struct IcebergTableMetadata;

enum class IcebergSnapshotOperationType : uint8_t { APPEND, REPLACE, OVERWRITE, DELETE };

//! Snapshot summary statistics for Iceberg spec compliance
struct IcebergSnapshotSummary {
//! Cumulative totals (required by some query engines like Redshift)
int64_t total_records = 0;
int64_t total_data_files = 0;
int64_t total_files_size = 0;
int64_t total_delete_files = 0;
int64_t total_position_deletes = 0;
int64_t total_equality_deletes = 0;

//! Delta values for this snapshot
int64_t added_records = 0;
int64_t added_data_files = 0;
int64_t added_files_size = 0;
int64_t deleted_records = 0;
int64_t deleted_data_files = 0;
int64_t removed_files_size = 0;
int64_t changed_partition_count = 0;

bool has_statistics = false;
};

//! An Iceberg snapshot https://iceberg.apache.org/spec/#snapshots
class IcebergSnapshot {
public:
Expand All @@ -27,6 +49,9 @@ class IcebergSnapshot {
IcebergSnapshotOperationType operation;
timestamp_t timestamp_ms;
string manifest_list;

//! Summary statistics
IcebergSnapshotSummary summary;
};

} // namespace duckdb
71 changes: 71 additions & 0 deletions src/metadata/iceberg_snapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,42 @@ rest_api_objects::Snapshot IcebergSnapshot::ToRESTObject() const {

res.summary.operation = OperationTypeToString(operation);

//! Add summary statistics to additional_properties if available
if (summary.has_statistics) {
auto &props = res.summary.additional_properties;

//! Cumulative totals (required by Redshift and other engines)
props["total-records"] = std::to_string(summary.total_records);
props["total-data-files"] = std::to_string(summary.total_data_files);
props["total-files-size"] = std::to_string(summary.total_files_size);
props["total-delete-files"] = std::to_string(summary.total_delete_files);
props["total-position-deletes"] = std::to_string(summary.total_position_deletes);
props["total-equality-deletes"] = std::to_string(summary.total_equality_deletes);

//! Delta values for this snapshot
if (summary.added_records > 0 || operation == IcebergSnapshotOperationType::APPEND) {
props["added-records"] = std::to_string(summary.added_records);
}
if (summary.added_data_files > 0 || operation == IcebergSnapshotOperationType::APPEND) {
props["added-data-files"] = std::to_string(summary.added_data_files);
}
if (summary.added_files_size > 0 || operation == IcebergSnapshotOperationType::APPEND) {
props["added-files-size"] = std::to_string(summary.added_files_size);
}
if (summary.deleted_records > 0) {
props["deleted-records"] = std::to_string(summary.deleted_records);
}
if (summary.deleted_data_files > 0) {
props["deleted-data-files"] = std::to_string(summary.deleted_data_files);
}
if (summary.removed_files_size > 0) {
props["removed-files-size"] = std::to_string(summary.removed_files_size);
}
if (summary.changed_partition_count > 0) {
props["changed-partition-count"] = std::to_string(summary.changed_partition_count);
}
}

if (!has_parent_snapshot) {
res.has_parent_snapshot_id = false;
} else {
Expand All @@ -43,6 +79,18 @@ rest_api_objects::Snapshot IcebergSnapshot::ToRESTObject() const {
return res;
}

static int64_t ParseInt64FromProps(const case_insensitive_map_t<string> &props, const string &key, int64_t default_val = 0) {
auto it = props.find(key);
if (it != props.end()) {
try {
return std::stoll(it->second);
} catch (...) {
return default_val;
}
}
return default_val;
}

IcebergSnapshot IcebergSnapshot::ParseSnapshot(rest_api_objects::Snapshot &snapshot, IcebergTableMetadata &metadata) {
IcebergSnapshot ret;
if (metadata.iceberg_version == 1) {
Expand All @@ -57,6 +105,29 @@ IcebergSnapshot IcebergSnapshot::ParseSnapshot(rest_api_objects::Snapshot &snaps
D_ASSERT(snapshot.has_schema_id);
ret.schema_id = snapshot.schema_id;
ret.manifest_list = snapshot.manifest_list;

//! Parse summary statistics from additional_properties if available
auto &props = snapshot.summary.additional_properties;
if (!props.empty()) {
//! Check if total-records exists (key indicator of statistics presence)
if (props.find("total-records") != props.end()) {
ret.summary.has_statistics = true;
ret.summary.total_records = ParseInt64FromProps(props, "total-records");
ret.summary.total_data_files = ParseInt64FromProps(props, "total-data-files");
ret.summary.total_files_size = ParseInt64FromProps(props, "total-files-size");
ret.summary.total_delete_files = ParseInt64FromProps(props, "total-delete-files");
ret.summary.total_position_deletes = ParseInt64FromProps(props, "total-position-deletes");
ret.summary.total_equality_deletes = ParseInt64FromProps(props, "total-equality-deletes");
ret.summary.added_records = ParseInt64FromProps(props, "added-records");
ret.summary.added_data_files = ParseInt64FromProps(props, "added-data-files");
ret.summary.added_files_size = ParseInt64FromProps(props, "added-files-size");
ret.summary.deleted_records = ParseInt64FromProps(props, "deleted-records");
ret.summary.deleted_data_files = ParseInt64FromProps(props, "deleted-data-files");
ret.summary.removed_files_size = ParseInt64FromProps(props, "removed-files-size");
ret.summary.changed_partition_count = ParseInt64FromProps(props, "changed-partition-count");
}
}

return ret;
}

Expand Down
89 changes: 89 additions & 0 deletions src/storage/iceberg_transaction_data.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,55 @@ void IcebergTransactionData::AddSnapshot(IcebergSnapshotOperationType operation,
}
}

//! Compute snapshot summary statistics
new_snapshot.summary.has_statistics = true;

//! Get previous snapshot totals if available
IcebergSnapshotSummary prev_summary;
if (new_snapshot.has_parent_snapshot) {
if (!alters.empty()) {
auto &last_alter = alters.back().get();
prev_summary = last_alter.snapshot.summary;
} else {
auto prev_snapshot = table_metadata.GetSnapshotById(new_snapshot.parent_snapshot_id);
if (prev_snapshot && prev_snapshot->summary.has_statistics) {
prev_summary = prev_snapshot->summary;
}
}
}

//! Calculate delta values from data_files
for (const auto &file : data_files) {
if (operation == IcebergSnapshotOperationType::APPEND) {
new_snapshot.summary.added_records += file.record_count;
new_snapshot.summary.added_data_files += 1;
new_snapshot.summary.added_files_size += file.file_size_in_bytes;
} else if (operation == IcebergSnapshotOperationType::DELETE) {
new_snapshot.summary.deleted_records += file.record_count;
new_snapshot.summary.deleted_data_files += 1;
new_snapshot.summary.removed_files_size += file.file_size_in_bytes;
}
}

//! Calculate cumulative totals
if (operation == IcebergSnapshotOperationType::APPEND) {
new_snapshot.summary.total_records = prev_summary.total_records + new_snapshot.summary.added_records;
new_snapshot.summary.total_data_files = prev_summary.total_data_files + new_snapshot.summary.added_data_files;
new_snapshot.summary.total_files_size = prev_summary.total_files_size + new_snapshot.summary.added_files_size;
new_snapshot.summary.total_delete_files = prev_summary.total_delete_files;
new_snapshot.summary.total_position_deletes = prev_summary.total_position_deletes;
new_snapshot.summary.total_equality_deletes = prev_summary.total_equality_deletes;
//! For appends, changed partition count is typically 1 for unpartitioned tables
new_snapshot.summary.changed_partition_count = 1;
} else if (operation == IcebergSnapshotOperationType::DELETE) {
new_snapshot.summary.total_records = prev_summary.total_records;
new_snapshot.summary.total_data_files = prev_summary.total_data_files;
new_snapshot.summary.total_files_size = prev_summary.total_files_size;
new_snapshot.summary.total_delete_files = prev_summary.total_delete_files + new_snapshot.summary.deleted_data_files;
new_snapshot.summary.total_position_deletes = prev_summary.total_position_deletes + new_snapshot.summary.deleted_records;
new_snapshot.summary.total_equality_deletes = prev_summary.total_equality_deletes;
}

auto manifest_content_type = IcebergManifestContentType::DATA;
switch (operation) {
case IcebergSnapshotOperationType::DELETE:
Expand Down Expand Up @@ -172,6 +221,46 @@ void IcebergTransactionData::AddUpdateSnapshot(vector<IcebergManifestEntry> &&de
}
}

//! Compute snapshot summary statistics for OVERWRITE
new_snapshot.summary.has_statistics = true;

//! Get previous snapshot totals if available
IcebergSnapshotSummary prev_summary;
if (new_snapshot.has_parent_snapshot) {
if (!alters.empty()) {
auto &last_alter = alters.back().get();
prev_summary = last_alter.snapshot.summary;
} else {
auto prev_snapshot = table_metadata.GetSnapshotById(new_snapshot.parent_snapshot_id);
if (prev_snapshot && prev_snapshot->summary.has_statistics) {
prev_summary = prev_snapshot->summary;
}
}
}

//! Calculate delta values from delete_files
for (const auto &file : delete_files) {
new_snapshot.summary.deleted_records += file.record_count;
new_snapshot.summary.deleted_data_files += 1;
new_snapshot.summary.removed_files_size += file.file_size_in_bytes;
}

//! Calculate delta values from data_files
for (const auto &file : data_files) {
new_snapshot.summary.added_records += file.record_count;
new_snapshot.summary.added_data_files += 1;
new_snapshot.summary.added_files_size += file.file_size_in_bytes;
}

//! Calculate cumulative totals for OVERWRITE
new_snapshot.summary.total_records = prev_summary.total_records - new_snapshot.summary.deleted_records + new_snapshot.summary.added_records;
new_snapshot.summary.total_data_files = prev_summary.total_data_files - new_snapshot.summary.deleted_data_files + new_snapshot.summary.added_data_files;
new_snapshot.summary.total_files_size = prev_summary.total_files_size - new_snapshot.summary.removed_files_size + new_snapshot.summary.added_files_size;
new_snapshot.summary.total_delete_files = prev_summary.total_delete_files + static_cast<int64_t>(delete_files.size());
new_snapshot.summary.total_position_deletes = prev_summary.total_position_deletes;
new_snapshot.summary.total_equality_deletes = prev_summary.total_equality_deletes;
new_snapshot.summary.changed_partition_count = 1;

auto add_snapshot = make_uniq<IcebergAddSnapshot>(table_info, manifest_list_path, std::move(new_snapshot));
CreateManifestListEntry(*add_snapshot, table_metadata, IcebergManifestContentType::DELETE, std::move(delete_files));

Expand Down
4 changes: 4 additions & 0 deletions src/storage/irc_transaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,10 @@ void CommitTableToJSON(yyjson_mut_doc *doc, yyjson_mut_val *root_object,
yyjson_mut_obj_add_strcpy(doc, snapshot_json, "manifest-list", snapshot.manifest_list.c_str());
auto summary_json = yyjson_mut_obj_add_obj(doc, snapshot_json, "summary");
yyjson_mut_obj_add_strcpy(doc, summary_json, "operation", snapshot.summary.operation.c_str());
//! Add additional properties (statistics) to summary
for (const auto &prop : snapshot.summary.additional_properties) {
yyjson_mut_obj_add_strcpy(doc, summary_json, prop.first.c_str(), prop.second.c_str());
}
yyjson_mut_obj_add_uint(doc, snapshot_json, "schema-id", snapshot.schema_id);
} else if (update.has_set_snapshot_ref_update) {
auto update_json = yyjson_mut_arr_add_obj(doc, updates_array);
Expand Down