-
Notifications
You must be signed in to change notification settings - Fork 81
Initial snapshot summary metrics support #529
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
022c621
e994f85
565674e
bd00ce4
2927dbe
934ce49
19779fc
ca2e8fe
e05ee76
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,40 @@ static string OperationTypeToString(IcebergSnapshotOperationType type) { | |
| } | ||
| } | ||
|
|
||
| static const std::map<SnapshotMetricType, string> kSnapshotMetricKeys = { | ||
| {SnapshotMetricType::ADDED_DATA_FILES, "added-data-files"}, | ||
| {SnapshotMetricType::ADDED_RECORDS, "added-records"}, | ||
| {SnapshotMetricType::DELETED_DATA_FILES, "deleted-data-files"}, | ||
| {SnapshotMetricType::DELETED_RECORDS, "deleted-records"}, | ||
| {SnapshotMetricType::TOTAL_DATA_FILES, "total-data-files"}, | ||
| {SnapshotMetricType::TOTAL_RECORDS, "total-records"}}; | ||
|
|
||
| static string MetricsTypeToString(SnapshotMetricType type) { | ||
| auto entry = kSnapshotMetricKeys.find(type); | ||
| if (entry == kSnapshotMetricKeys.end()) { | ||
| throw InvalidConfigurationException("Metrics type not implemented: %d", static_cast<uint8_t>(type)); | ||
| } | ||
| return entry->second; | ||
| } | ||
|
|
||
| static IcebergSnapshot::metrics_map_t MetricsFromSummary(const case_insensitive_map_t<string> &snapshot_summary) { | ||
| IcebergSnapshot::metrics_map_t metrics; | ||
| for (auto &entry : kSnapshotMetricKeys) { | ||
| auto it = snapshot_summary.find(entry.second); | ||
| if (it != snapshot_summary.end()) { | ||
| int64_t value; | ||
| try { | ||
| value = std::stoll(it->second); | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| } catch (...) { | ||
| // Skip invalid metrics | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Somewhat matches https://github.com/apache/iceberg/blob/06ebf07e671c3ee2b0d0add1c4dc849ab9cbf630/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L818-L820, except we discard invalid metrics at earlier stage |
||
| continue; | ||
| } | ||
| metrics[entry.first] = value; | ||
| } | ||
| } | ||
| return metrics; | ||
| } | ||
|
|
||
| rest_api_objects::Snapshot IcebergSnapshot::ToRESTObject() const { | ||
| rest_api_objects::Snapshot res; | ||
|
|
||
|
|
@@ -26,6 +60,9 @@ rest_api_objects::Snapshot IcebergSnapshot::ToRESTObject() const { | |
| res.manifest_list = manifest_list; | ||
|
|
||
| res.summary.operation = OperationTypeToString(operation); | ||
| for (auto &entry : metrics) { | ||
| res.summary.additional_properties[MetricsTypeToString(entry.first)] = std::to_string(entry.second); | ||
| } | ||
|
|
||
| if (!has_parent_snapshot) { | ||
| res.has_parent_snapshot_id = false; | ||
|
|
@@ -57,6 +94,8 @@ IcebergSnapshot IcebergSnapshot::ParseSnapshot(rest_api_objects::Snapshot &snaps | |
| D_ASSERT(snapshot.has_schema_id); | ||
| ret.schema_id = snapshot.schema_id; | ||
| ret.manifest_list = snapshot.manifest_list; | ||
| ret.metrics = MetricsFromSummary(snapshot.summary.additional_properties); | ||
|
|
||
| return ret; | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,35 @@ static int64_t NewSnapshotId() { | |
| return random_number; | ||
| } | ||
|
|
||
| static const IcebergSnapshot::metrics_map_t kMetricsWithEmptyTotals {{SnapshotMetricType::TOTAL_DATA_FILES, 0}, | ||
| {SnapshotMetricType::TOTAL_RECORDS, 0}}; | ||
|
|
||
| static IcebergSnapshot::metrics_map_t GetSnapshotMetrics(const IcebergManifest &manifest, | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Because DuckDB fast appends a manifest for new snapshot, this implementation is simpler than the Java side |
||
| const IcebergSnapshot::metrics_map_t &previous_metrics) { | ||
| IcebergSnapshot::metrics_map_t metrics {{SnapshotMetricType::ADDED_DATA_FILES, manifest.added_files_count}, | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| {SnapshotMetricType::ADDED_RECORDS, manifest.added_rows_count}, | ||
| {SnapshotMetricType::DELETED_DATA_FILES, manifest.deleted_files_count}, | ||
| {SnapshotMetricType::DELETED_RECORDS, manifest.deleted_rows_count}}; | ||
|
|
||
| auto previous_total_files = previous_metrics.find(SnapshotMetricType::TOTAL_DATA_FILES); | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See https://github.com/apache/iceberg/blob/06ebf07e671c3ee2b0d0add1c4dc849ab9cbf630/core/src/main/java/org/apache/iceberg/SnapshotProducer.java#L792 for this combining process with previous snapshot summary (again simpler for us since we can just use the manifest) |
||
| if (previous_total_files != previous_metrics.end()) { | ||
| int64_t total_files = previous_total_files->second + manifest.added_files_count - manifest.deleted_files_count; | ||
| if (total_files >= 0) | ||
| metrics[SnapshotMetricType::TOTAL_DATA_FILES] = total_files; | ||
| } | ||
|
|
||
| auto previous_total_records = previous_metrics.find(SnapshotMetricType::TOTAL_RECORDS); | ||
| if (previous_total_records != previous_metrics.end()) { | ||
| int64_t total_records = | ||
| previous_total_records->second + manifest.added_rows_count - manifest.deleted_rows_count; | ||
| if (total_records >= 0) { | ||
| metrics[SnapshotMetricType::TOTAL_RECORDS] = total_records; | ||
| } | ||
| } | ||
|
|
||
| return metrics; | ||
| } | ||
|
|
||
| void IcebergTransactionData::AddSnapshot(IcebergSnapshotOperationType operation, | ||
| vector<IcebergManifestEntry> &&data_files) { | ||
| D_ASSERT(!data_files.empty()); | ||
|
|
@@ -43,6 +72,32 @@ void IcebergTransactionData::AddSnapshot(IcebergSnapshotOperationType operation, | |
| auto manifest_list_path = table_info.BaseFilePath() + "/metadata/snap-" + std::to_string(snapshot_id) + "-" + | ||
| manifest_list_uuid + ".avro"; | ||
|
|
||
| //! Construct the manifest | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Rearranging - wanted to construct manifest fully first so it can be then used to construct the snapshot (the snapshot now depends on it for metrics). Just thought this read nicer now |
||
| IcebergManifest manifest; | ||
| manifest.manifest_path = manifest_file_path; | ||
| manifest.sequence_number = sequence_number; | ||
| manifest.content = IcebergManifestContentType::DATA; | ||
| manifest.added_files_count = data_files.size(); | ||
| manifest.existing_files_count = 0; | ||
| manifest.deleted_files_count = 0; | ||
| manifest.added_rows_count = 0; | ||
| manifest.existing_rows_count = 0; | ||
| manifest.deleted_rows_count = 0; | ||
| //! TODO: support partitions | ||
| manifest.partition_spec_id = 0; | ||
| //! manifest.partitions = CreateManifestPartition(); | ||
|
|
||
| //! Add the data files | ||
| for (auto &data_file : data_files) { | ||
| manifest.added_rows_count += data_file.record_count; | ||
| data_file.sequence_number = sequence_number; | ||
| if (!manifest.has_min_sequence_number || data_file.sequence_number < manifest.min_sequence_number) { | ||
| manifest.min_sequence_number = data_file.sequence_number; | ||
| } | ||
| manifest.has_min_sequence_number = true; | ||
| } | ||
| manifest.added_snapshot_id = snapshot_id; | ||
|
|
||
| //! Construct the snapshot | ||
| IcebergSnapshot new_snapshot; | ||
| new_snapshot.snapshot_id = snapshot_id; | ||
|
|
@@ -57,43 +112,24 @@ void IcebergTransactionData::AddSnapshot(IcebergSnapshotOperationType operation, | |
| if (!alters.empty()) { | ||
| auto &last_alter = alters.back().get(); | ||
| new_snapshot.parent_snapshot_id = last_alter.snapshot.snapshot_id; | ||
| new_snapshot.metrics = GetSnapshotMetrics(manifest, last_alter.snapshot.metrics); | ||
| } else { | ||
| D_ASSERT(table_info.table_metadata.has_current_snapshot); | ||
| new_snapshot.parent_snapshot_id = table_info.table_metadata.current_snapshot_id; | ||
| new_snapshot.metrics = GetSnapshotMetrics( | ||
| manifest, table_info.table_metadata.GetSnapshotById(new_snapshot.parent_snapshot_id)->metrics); | ||
| } | ||
| } else { | ||
| // If there was no previous snapshot, default the metrics to start totals at 0 | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| new_snapshot.metrics = GetSnapshotMetrics(manifest, kMetricsWithEmptyTotals); | ||
| } | ||
|
|
||
| new_manifest_file.data_files.insert(new_manifest_file.data_files.end(), std::make_move_iterator(data_files.begin()), | ||
| std::make_move_iterator(data_files.end())); | ||
| auto add_snapshot = make_uniq<IcebergAddSnapshot>(table_info, std::move(new_manifest_file), manifest_list_path, | ||
| std::move(new_snapshot)); | ||
| auto &manifest_file = add_snapshot->manifest_file; | ||
| auto &manifest = add_snapshot->manifest; | ||
| auto &snapshot = add_snapshot->snapshot; | ||
| add_snapshot->manifest = std::move(manifest); | ||
|
|
||
| manifest.manifest_path = manifest_file_path; | ||
| manifest.sequence_number = sequence_number; | ||
| manifest.content = IcebergManifestContentType::DATA; | ||
| manifest.added_files_count = data_files.size(); | ||
| manifest.existing_files_count = 0; | ||
| manifest.deleted_files_count = 0; | ||
| manifest.added_rows_count = 0; | ||
| manifest.existing_rows_count = 0; | ||
| manifest.deleted_rows_count = 0; | ||
| //! TODO: support partitions | ||
| manifest.partition_spec_id = 0; | ||
| //! manifest.partitions = CreateManifestPartition(); | ||
|
|
||
| //! Add the data files | ||
| for (auto &data_file : data_files) { | ||
| manifest.added_rows_count += data_file.record_count; | ||
| data_file.sequence_number = snapshot.sequence_number; | ||
| if (!manifest.has_min_sequence_number || data_file.sequence_number < manifest.min_sequence_number) { | ||
| manifest.min_sequence_number = data_file.sequence_number; | ||
| } | ||
| manifest.has_min_sequence_number = true; | ||
| } | ||
| manifest.added_snapshot_id = snapshot.snapshot_id; | ||
| manifest_file.data_files.insert(manifest_file.data_files.end(), std::make_move_iterator(data_files.begin()), | ||
| std::make_move_iterator(data_files.end())); | ||
| alters.push_back(*add_snapshot); | ||
| updates.push_back(std::move(add_snapshot)); | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -77,6 +77,9 @@ void CommitTableToJSON(yyjson_mut_doc *doc, yyjson_mut_val *root_object, | |
| yyjson_mut_obj_add_strcpy(doc, snapshot_json, "manifest-list", snapshot.manifest_list.c_str()); | ||
| auto summary_json = yyjson_mut_obj_add_obj(doc, snapshot_json, "summary"); | ||
| yyjson_mut_obj_add_strcpy(doc, summary_json, "operation", snapshot.summary.operation.c_str()); | ||
| for (auto &prop : snapshot.summary.additional_properties) { | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is similar to what's done to serialize
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Keep in mind that the
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks, yeah looking at usages I also think this holds. This contract was also relied on by the |
||
| yyjson_mut_obj_add_strcpy(doc, summary_json, prop.first.c_str(), prop.second.c_str()); | ||
| } | ||
| yyjson_mut_obj_add_uint(doc, snapshot_json, "schema-id", snapshot.schema_id); | ||
| } else if (update.has_set_snapshot_ref_update) { | ||
| auto update_json = yyjson_mut_arr_add_obj(doc, updates_array); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: why the
k?