diff --git a/src/include/metadata/iceberg_snapshot.hpp b/src/include/metadata/iceberg_snapshot.hpp index eac2bcb5c..e92e43913 100644 --- a/src/include/metadata/iceberg_snapshot.hpp +++ b/src/include/metadata/iceberg_snapshot.hpp @@ -9,6 +9,28 @@ struct IcebergTableMetadata; enum class IcebergSnapshotOperationType : uint8_t { APPEND, REPLACE, OVERWRITE, DELETE }; +//! Snapshot summary statistics for Iceberg spec compliance +struct IcebergSnapshotSummary { + //! Cumulative totals (required by some query engines like Redshift) + int64_t total_records = 0; + int64_t total_data_files = 0; + int64_t total_files_size = 0; + int64_t total_delete_files = 0; + int64_t total_position_deletes = 0; + int64_t total_equality_deletes = 0; + + //! Delta values for this snapshot + int64_t added_records = 0; + int64_t added_data_files = 0; + int64_t added_files_size = 0; + int64_t deleted_records = 0; + int64_t deleted_data_files = 0; + int64_t removed_files_size = 0; + int64_t changed_partition_count = 0; + + bool has_statistics = false; +}; + //! An Iceberg snapshot https://iceberg.apache.org/spec/#snapshots class IcebergSnapshot { public: @@ -27,6 +49,9 @@ class IcebergSnapshot { IcebergSnapshotOperationType operation; timestamp_t timestamp_ms; string manifest_list; + + //! Summary statistics + IcebergSnapshotSummary summary; }; } // namespace duckdb diff --git a/src/metadata/iceberg_snapshot.cpp b/src/metadata/iceberg_snapshot.cpp index 4f7a85185..81da0658a 100644 --- a/src/metadata/iceberg_snapshot.cpp +++ b/src/metadata/iceberg_snapshot.cpp @@ -27,6 +27,42 @@ rest_api_objects::Snapshot IcebergSnapshot::ToRESTObject() const { res.summary.operation = OperationTypeToString(operation); + //! Add summary statistics to additional_properties if available + if (summary.has_statistics) { + auto &props = res.summary.additional_properties; + + //! Cumulative totals (required by Redshift and other engines) + props["total-records"] = std::to_string(summary.total_records); + props["total-data-files"] = std::to_string(summary.total_data_files); + props["total-files-size"] = std::to_string(summary.total_files_size); + props["total-delete-files"] = std::to_string(summary.total_delete_files); + props["total-position-deletes"] = std::to_string(summary.total_position_deletes); + props["total-equality-deletes"] = std::to_string(summary.total_equality_deletes); + + //! Delta values for this snapshot + if (summary.added_records > 0 || operation == IcebergSnapshotOperationType::APPEND) { + props["added-records"] = std::to_string(summary.added_records); + } + if (summary.added_data_files > 0 || operation == IcebergSnapshotOperationType::APPEND) { + props["added-data-files"] = std::to_string(summary.added_data_files); + } + if (summary.added_files_size > 0 || operation == IcebergSnapshotOperationType::APPEND) { + props["added-files-size"] = std::to_string(summary.added_files_size); + } + if (summary.deleted_records > 0) { + props["deleted-records"] = std::to_string(summary.deleted_records); + } + if (summary.deleted_data_files > 0) { + props["deleted-data-files"] = std::to_string(summary.deleted_data_files); + } + if (summary.removed_files_size > 0) { + props["removed-files-size"] = std::to_string(summary.removed_files_size); + } + if (summary.changed_partition_count > 0) { + props["changed-partition-count"] = std::to_string(summary.changed_partition_count); + } + } + if (!has_parent_snapshot) { res.has_parent_snapshot_id = false; } else { @@ -43,6 +79,18 @@ rest_api_objects::Snapshot IcebergSnapshot::ToRESTObject() const { return res; } +static int64_t ParseInt64FromProps(const case_insensitive_map_t &props, const string &key, int64_t default_val = 0) { + auto it = props.find(key); + if (it != props.end()) { + try { + return std::stoll(it->second); + } catch (...) { + return default_val; + } + } + return default_val; +} + IcebergSnapshot IcebergSnapshot::ParseSnapshot(rest_api_objects::Snapshot &snapshot, IcebergTableMetadata &metadata) { IcebergSnapshot ret; if (metadata.iceberg_version == 1) { @@ -57,6 +105,29 @@ IcebergSnapshot IcebergSnapshot::ParseSnapshot(rest_api_objects::Snapshot &snaps D_ASSERT(snapshot.has_schema_id); ret.schema_id = snapshot.schema_id; ret.manifest_list = snapshot.manifest_list; + + //! Parse summary statistics from additional_properties if available + auto &props = snapshot.summary.additional_properties; + if (!props.empty()) { + //! Check if total-records exists (key indicator of statistics presence) + if (props.find("total-records") != props.end()) { + ret.summary.has_statistics = true; + ret.summary.total_records = ParseInt64FromProps(props, "total-records"); + ret.summary.total_data_files = ParseInt64FromProps(props, "total-data-files"); + ret.summary.total_files_size = ParseInt64FromProps(props, "total-files-size"); + ret.summary.total_delete_files = ParseInt64FromProps(props, "total-delete-files"); + ret.summary.total_position_deletes = ParseInt64FromProps(props, "total-position-deletes"); + ret.summary.total_equality_deletes = ParseInt64FromProps(props, "total-equality-deletes"); + ret.summary.added_records = ParseInt64FromProps(props, "added-records"); + ret.summary.added_data_files = ParseInt64FromProps(props, "added-data-files"); + ret.summary.added_files_size = ParseInt64FromProps(props, "added-files-size"); + ret.summary.deleted_records = ParseInt64FromProps(props, "deleted-records"); + ret.summary.deleted_data_files = ParseInt64FromProps(props, "deleted-data-files"); + ret.summary.removed_files_size = ParseInt64FromProps(props, "removed-files-size"); + ret.summary.changed_partition_count = ParseInt64FromProps(props, "changed-partition-count"); + } + } + return ret; } diff --git a/src/storage/iceberg_transaction_data.cpp b/src/storage/iceberg_transaction_data.cpp index 0abd4c51b..15d3ade72 100644 --- a/src/storage/iceberg_transaction_data.cpp +++ b/src/storage/iceberg_transaction_data.cpp @@ -116,6 +116,55 @@ void IcebergTransactionData::AddSnapshot(IcebergSnapshotOperationType operation, } } + //! Compute snapshot summary statistics + new_snapshot.summary.has_statistics = true; + + //! Get previous snapshot totals if available + IcebergSnapshotSummary prev_summary; + if (new_snapshot.has_parent_snapshot) { + if (!alters.empty()) { + auto &last_alter = alters.back().get(); + prev_summary = last_alter.snapshot.summary; + } else { + auto prev_snapshot = table_metadata.GetSnapshotById(new_snapshot.parent_snapshot_id); + if (prev_snapshot && prev_snapshot->summary.has_statistics) { + prev_summary = prev_snapshot->summary; + } + } + } + + //! Calculate delta values from data_files + for (const auto &file : data_files) { + if (operation == IcebergSnapshotOperationType::APPEND) { + new_snapshot.summary.added_records += file.record_count; + new_snapshot.summary.added_data_files += 1; + new_snapshot.summary.added_files_size += file.file_size_in_bytes; + } else if (operation == IcebergSnapshotOperationType::DELETE) { + new_snapshot.summary.deleted_records += file.record_count; + new_snapshot.summary.deleted_data_files += 1; + new_snapshot.summary.removed_files_size += file.file_size_in_bytes; + } + } + + //! Calculate cumulative totals + if (operation == IcebergSnapshotOperationType::APPEND) { + new_snapshot.summary.total_records = prev_summary.total_records + new_snapshot.summary.added_records; + new_snapshot.summary.total_data_files = prev_summary.total_data_files + new_snapshot.summary.added_data_files; + new_snapshot.summary.total_files_size = prev_summary.total_files_size + new_snapshot.summary.added_files_size; + new_snapshot.summary.total_delete_files = prev_summary.total_delete_files; + new_snapshot.summary.total_position_deletes = prev_summary.total_position_deletes; + new_snapshot.summary.total_equality_deletes = prev_summary.total_equality_deletes; + //! For appends, changed partition count is typically 1 for unpartitioned tables + new_snapshot.summary.changed_partition_count = 1; + } else if (operation == IcebergSnapshotOperationType::DELETE) { + new_snapshot.summary.total_records = prev_summary.total_records; + new_snapshot.summary.total_data_files = prev_summary.total_data_files; + new_snapshot.summary.total_files_size = prev_summary.total_files_size; + new_snapshot.summary.total_delete_files = prev_summary.total_delete_files + new_snapshot.summary.deleted_data_files; + new_snapshot.summary.total_position_deletes = prev_summary.total_position_deletes + new_snapshot.summary.deleted_records; + new_snapshot.summary.total_equality_deletes = prev_summary.total_equality_deletes; + } + auto manifest_content_type = IcebergManifestContentType::DATA; switch (operation) { case IcebergSnapshotOperationType::DELETE: @@ -172,6 +221,46 @@ void IcebergTransactionData::AddUpdateSnapshot(vector &&de } } + //! Compute snapshot summary statistics for OVERWRITE + new_snapshot.summary.has_statistics = true; + + //! Get previous snapshot totals if available + IcebergSnapshotSummary prev_summary; + if (new_snapshot.has_parent_snapshot) { + if (!alters.empty()) { + auto &last_alter = alters.back().get(); + prev_summary = last_alter.snapshot.summary; + } else { + auto prev_snapshot = table_metadata.GetSnapshotById(new_snapshot.parent_snapshot_id); + if (prev_snapshot && prev_snapshot->summary.has_statistics) { + prev_summary = prev_snapshot->summary; + } + } + } + + //! Calculate delta values from delete_files + for (const auto &file : delete_files) { + new_snapshot.summary.deleted_records += file.record_count; + new_snapshot.summary.deleted_data_files += 1; + new_snapshot.summary.removed_files_size += file.file_size_in_bytes; + } + + //! Calculate delta values from data_files + for (const auto &file : data_files) { + new_snapshot.summary.added_records += file.record_count; + new_snapshot.summary.added_data_files += 1; + new_snapshot.summary.added_files_size += file.file_size_in_bytes; + } + + //! Calculate cumulative totals for OVERWRITE + new_snapshot.summary.total_records = prev_summary.total_records - new_snapshot.summary.deleted_records + new_snapshot.summary.added_records; + new_snapshot.summary.total_data_files = prev_summary.total_data_files - new_snapshot.summary.deleted_data_files + new_snapshot.summary.added_data_files; + new_snapshot.summary.total_files_size = prev_summary.total_files_size - new_snapshot.summary.removed_files_size + new_snapshot.summary.added_files_size; + new_snapshot.summary.total_delete_files = prev_summary.total_delete_files + static_cast(delete_files.size()); + new_snapshot.summary.total_position_deletes = prev_summary.total_position_deletes; + new_snapshot.summary.total_equality_deletes = prev_summary.total_equality_deletes; + new_snapshot.summary.changed_partition_count = 1; + auto add_snapshot = make_uniq(table_info, manifest_list_path, std::move(new_snapshot)); CreateManifestListEntry(*add_snapshot, table_metadata, IcebergManifestContentType::DELETE, std::move(delete_files)); diff --git a/src/storage/irc_transaction.cpp b/src/storage/irc_transaction.cpp index 288539048..17af75fb3 100644 --- a/src/storage/irc_transaction.cpp +++ b/src/storage/irc_transaction.cpp @@ -77,6 +77,10 @@ void CommitTableToJSON(yyjson_mut_doc *doc, yyjson_mut_val *root_object, yyjson_mut_obj_add_strcpy(doc, snapshot_json, "manifest-list", snapshot.manifest_list.c_str()); auto summary_json = yyjson_mut_obj_add_obj(doc, snapshot_json, "summary"); yyjson_mut_obj_add_strcpy(doc, summary_json, "operation", snapshot.summary.operation.c_str()); + //! Add additional properties (statistics) to summary + for (const auto &prop : snapshot.summary.additional_properties) { + yyjson_mut_obj_add_strcpy(doc, summary_json, prop.first.c_str(), prop.second.c_str()); + } yyjson_mut_obj_add_uint(doc, snapshot_json, "schema-id", snapshot.schema_id); } else if (update.has_set_snapshot_ref_update) { auto update_json = yyjson_mut_arr_add_obj(doc, updates_array);