diff --git a/src/include/metadata/iceberg_snapshot.hpp b/src/include/metadata/iceberg_snapshot.hpp index a801e3f66..aa292b154 100644 --- a/src/include/metadata/iceberg_snapshot.hpp +++ b/src/include/metadata/iceberg_snapshot.hpp @@ -9,6 +9,15 @@ struct IcebergTableMetadata; enum class IcebergSnapshotOperationType : uint8_t { APPEND, REPLACE, OVERWRITE, DELETE }; +enum class SnapshotMetricType : uint8_t { + ADDED_DATA_FILES, + ADDED_RECORDS, + DELETED_DATA_FILES, + DELETED_RECORDS, + TOTAL_DATA_FILES, + TOTAL_RECORDS +}; + //! An Iceberg snapshot https://iceberg.apache.org/spec/#snapshots class IcebergSnapshot { public: @@ -16,6 +25,7 @@ class IcebergSnapshot { } static IcebergSnapshot ParseSnapshot(rest_api_objects::Snapshot &snapshot, IcebergTableMetadata &metadata); rest_api_objects::Snapshot ToRESTObject() const; + using metrics_map_t = std::map; public: //! Snapshot metadata @@ -27,6 +37,7 @@ class IcebergSnapshot { IcebergSnapshotOperationType operation = IcebergSnapshotOperationType::APPEND; timestamp_t timestamp_ms; string manifest_list; + metrics_map_t metrics; }; } // namespace duckdb diff --git a/src/metadata/iceberg_snapshot.cpp b/src/metadata/iceberg_snapshot.cpp index 4f7a85185..eae3815cd 100644 --- a/src/metadata/iceberg_snapshot.cpp +++ b/src/metadata/iceberg_snapshot.cpp @@ -18,6 +18,40 @@ static string OperationTypeToString(IcebergSnapshotOperationType type) { } } +static const std::map kSnapshotMetricKeys = { + {SnapshotMetricType::ADDED_DATA_FILES, "added-data-files"}, + {SnapshotMetricType::ADDED_RECORDS, "added-records"}, + {SnapshotMetricType::DELETED_DATA_FILES, "deleted-data-files"}, + {SnapshotMetricType::DELETED_RECORDS, "deleted-records"}, + {SnapshotMetricType::TOTAL_DATA_FILES, "total-data-files"}, + {SnapshotMetricType::TOTAL_RECORDS, "total-records"}}; + +static string MetricsTypeToString(SnapshotMetricType type) { + auto entry = kSnapshotMetricKeys.find(type); + if (entry == kSnapshotMetricKeys.end()) { + throw InvalidConfigurationException("Metrics type not implemented: %d", static_cast(type)); + } + return entry->second; +} + +static IcebergSnapshot::metrics_map_t MetricsFromSummary(const case_insensitive_map_t &snapshot_summary) { + IcebergSnapshot::metrics_map_t metrics; + for (auto &entry : kSnapshotMetricKeys) { + auto it = snapshot_summary.find(entry.second); + if (it != snapshot_summary.end()) { + int64_t value; + try { + value = std::stoll(it->second); + } catch (...) { + // Skip invalid metrics + continue; + } + metrics[entry.first] = value; + } + } + return metrics; +} + rest_api_objects::Snapshot IcebergSnapshot::ToRESTObject() const { rest_api_objects::Snapshot res; @@ -26,6 +60,9 @@ rest_api_objects::Snapshot IcebergSnapshot::ToRESTObject() const { res.manifest_list = manifest_list; res.summary.operation = OperationTypeToString(operation); + for (auto &entry : metrics) { + res.summary.additional_properties[MetricsTypeToString(entry.first)] = std::to_string(entry.second); + } if (!has_parent_snapshot) { res.has_parent_snapshot_id = false; @@ -57,6 +94,8 @@ IcebergSnapshot IcebergSnapshot::ParseSnapshot(rest_api_objects::Snapshot &snaps D_ASSERT(snapshot.has_schema_id); ret.schema_id = snapshot.schema_id; ret.manifest_list = snapshot.manifest_list; + ret.metrics = MetricsFromSummary(snapshot.summary.additional_properties); + return ret; } diff --git a/src/storage/iceberg_transaction_data.cpp b/src/storage/iceberg_transaction_data.cpp index 50000d181..13e518a67 100644 --- a/src/storage/iceberg_transaction_data.cpp +++ b/src/storage/iceberg_transaction_data.cpp @@ -17,6 +17,35 @@ static int64_t NewSnapshotId() { return random_number; } +static const IcebergSnapshot::metrics_map_t kMetricsWithEmptyTotals {{SnapshotMetricType::TOTAL_DATA_FILES, 0}, + {SnapshotMetricType::TOTAL_RECORDS, 0}}; + +static IcebergSnapshot::metrics_map_t GetSnapshotMetrics(const IcebergManifest &manifest, + const IcebergSnapshot::metrics_map_t &previous_metrics) { + IcebergSnapshot::metrics_map_t metrics {{SnapshotMetricType::ADDED_DATA_FILES, manifest.added_files_count}, + {SnapshotMetricType::ADDED_RECORDS, manifest.added_rows_count}, + {SnapshotMetricType::DELETED_DATA_FILES, manifest.deleted_files_count}, + {SnapshotMetricType::DELETED_RECORDS, manifest.deleted_rows_count}}; + + auto previous_total_files = previous_metrics.find(SnapshotMetricType::TOTAL_DATA_FILES); + if (previous_total_files != previous_metrics.end()) { + int64_t total_files = previous_total_files->second + manifest.added_files_count - manifest.deleted_files_count; + if (total_files >= 0) + metrics[SnapshotMetricType::TOTAL_DATA_FILES] = total_files; + } + + auto previous_total_records = previous_metrics.find(SnapshotMetricType::TOTAL_RECORDS); + if (previous_total_records != previous_metrics.end()) { + int64_t total_records = + previous_total_records->second + manifest.added_rows_count - manifest.deleted_rows_count; + if (total_records >= 0) { + metrics[SnapshotMetricType::TOTAL_RECORDS] = total_records; + } + } + + return metrics; +} + void IcebergTransactionData::AddSnapshot(IcebergSnapshotOperationType operation, vector &&data_files) { D_ASSERT(!data_files.empty()); @@ -43,6 +72,32 @@ void IcebergTransactionData::AddSnapshot(IcebergSnapshotOperationType operation, auto manifest_list_path = table_info.BaseFilePath() + "/metadata/snap-" + std::to_string(snapshot_id) + "-" + manifest_list_uuid + ".avro"; + //! Construct the manifest + IcebergManifest manifest; + manifest.manifest_path = manifest_file_path; + manifest.sequence_number = sequence_number; + manifest.content = IcebergManifestContentType::DATA; + manifest.added_files_count = data_files.size(); + manifest.existing_files_count = 0; + manifest.deleted_files_count = 0; + manifest.added_rows_count = 0; + manifest.existing_rows_count = 0; + manifest.deleted_rows_count = 0; + //! TODO: support partitions + manifest.partition_spec_id = 0; + //! manifest.partitions = CreateManifestPartition(); + + //! Add the data files + for (auto &data_file : data_files) { + manifest.added_rows_count += data_file.record_count; + data_file.sequence_number = sequence_number; + if (!manifest.has_min_sequence_number || data_file.sequence_number < manifest.min_sequence_number) { + manifest.min_sequence_number = data_file.sequence_number; + } + manifest.has_min_sequence_number = true; + } + manifest.added_snapshot_id = snapshot_id; + //! Construct the snapshot IcebergSnapshot new_snapshot; new_snapshot.snapshot_id = snapshot_id; @@ -57,43 +112,24 @@ void IcebergTransactionData::AddSnapshot(IcebergSnapshotOperationType operation, if (!alters.empty()) { auto &last_alter = alters.back().get(); new_snapshot.parent_snapshot_id = last_alter.snapshot.snapshot_id; + new_snapshot.metrics = GetSnapshotMetrics(manifest, last_alter.snapshot.metrics); } else { D_ASSERT(table_info.table_metadata.has_current_snapshot); new_snapshot.parent_snapshot_id = table_info.table_metadata.current_snapshot_id; + new_snapshot.metrics = GetSnapshotMetrics( + manifest, table_info.table_metadata.GetSnapshotById(new_snapshot.parent_snapshot_id)->metrics); } + } else { + // If there was no previous snapshot, default the metrics to start totals at 0 + new_snapshot.metrics = GetSnapshotMetrics(manifest, kMetricsWithEmptyTotals); } + new_manifest_file.data_files.insert(new_manifest_file.data_files.end(), std::make_move_iterator(data_files.begin()), + std::make_move_iterator(data_files.end())); auto add_snapshot = make_uniq(table_info, std::move(new_manifest_file), manifest_list_path, std::move(new_snapshot)); - auto &manifest_file = add_snapshot->manifest_file; - auto &manifest = add_snapshot->manifest; - auto &snapshot = add_snapshot->snapshot; + add_snapshot->manifest = std::move(manifest); - manifest.manifest_path = manifest_file_path; - manifest.sequence_number = sequence_number; - manifest.content = IcebergManifestContentType::DATA; - manifest.added_files_count = data_files.size(); - manifest.existing_files_count = 0; - manifest.deleted_files_count = 0; - manifest.added_rows_count = 0; - manifest.existing_rows_count = 0; - manifest.deleted_rows_count = 0; - //! TODO: support partitions - manifest.partition_spec_id = 0; - //! manifest.partitions = CreateManifestPartition(); - - //! Add the data files - for (auto &data_file : data_files) { - manifest.added_rows_count += data_file.record_count; - data_file.sequence_number = snapshot.sequence_number; - if (!manifest.has_min_sequence_number || data_file.sequence_number < manifest.min_sequence_number) { - manifest.min_sequence_number = data_file.sequence_number; - } - manifest.has_min_sequence_number = true; - } - manifest.added_snapshot_id = snapshot.snapshot_id; - manifest_file.data_files.insert(manifest_file.data_files.end(), std::make_move_iterator(data_files.begin()), - std::make_move_iterator(data_files.end())); alters.push_back(*add_snapshot); updates.push_back(std::move(add_snapshot)); } diff --git a/src/storage/irc_transaction.cpp b/src/storage/irc_transaction.cpp index 26309d483..cea198bbe 100644 --- a/src/storage/irc_transaction.cpp +++ b/src/storage/irc_transaction.cpp @@ -77,6 +77,9 @@ void CommitTableToJSON(yyjson_mut_doc *doc, yyjson_mut_val *root_object, yyjson_mut_obj_add_strcpy(doc, snapshot_json, "manifest-list", snapshot.manifest_list.c_str()); auto summary_json = yyjson_mut_obj_add_obj(doc, snapshot_json, "summary"); yyjson_mut_obj_add_strcpy(doc, summary_json, "operation", snapshot.summary.operation.c_str()); + for (auto &prop : snapshot.summary.additional_properties) { + yyjson_mut_obj_add_strcpy(doc, summary_json, prop.first.c_str(), prop.second.c_str()); + } yyjson_mut_obj_add_uint(doc, snapshot_json, "schema-id", snapshot.schema_id); } else if (update.has_set_snapshot_ref_update) { auto update_json = yyjson_mut_arr_add_obj(doc, updates_array);