Skip to content

Commit 75fd528

Browse files
authored
Simplify iceberg persistence stats. (#2142)
1 parent 50097c5 commit 75fd528

File tree

3 files changed

+56
-71
lines changed

3 files changed

+56
-71
lines changed

src/moonlink/src/observability/iceberg_persistence.rs

Lines changed: 45 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,7 @@ use crate::observability::latency_guard::LatencyGuard;
33
use opentelemetry::metrics::Histogram;
44
use opentelemetry::{global, KeyValue};
55

6-
#[derive(Debug, Clone, Copy)]
7-
pub(crate) enum IcebergPersistenceStage {
6+
enum PersistenceStage {
87
Overall,
98
DataFiles,
109
FileIndices,
@@ -14,35 +13,71 @@ pub(crate) enum IcebergPersistenceStage {
1413

1514
#[derive(Debug)]
1615
pub(crate) struct IcebergPersistenceStats {
16+
pub(crate) overall: IcebergPersistenceSingleStats,
17+
pub(crate) sync_data_files: IcebergPersistenceSingleStats,
18+
pub(crate) sync_file_indices: IcebergPersistenceSingleStats,
19+
pub(crate) sync_deletion_vectors: IcebergPersistenceSingleStats,
20+
pub(crate) transaction_commit: IcebergPersistenceSingleStats,
21+
}
22+
23+
impl IcebergPersistenceStats {
24+
pub(crate) fn new(mooncake_table_id: String) -> Self {
25+
Self {
26+
overall: IcebergPersistenceSingleStats::new(
27+
mooncake_table_id.to_string(),
28+
PersistenceStage::Overall,
29+
),
30+
sync_data_files: IcebergPersistenceSingleStats::new(
31+
mooncake_table_id.to_string(),
32+
PersistenceStage::DataFiles,
33+
),
34+
sync_file_indices: IcebergPersistenceSingleStats::new(
35+
mooncake_table_id.to_string(),
36+
PersistenceStage::FileIndices,
37+
),
38+
sync_deletion_vectors: IcebergPersistenceSingleStats::new(
39+
mooncake_table_id.to_string(),
40+
PersistenceStage::DeletionVectors,
41+
),
42+
transaction_commit: IcebergPersistenceSingleStats::new(
43+
mooncake_table_id.to_string(),
44+
PersistenceStage::TransactionCommit,
45+
),
46+
}
47+
}
48+
}
49+
50+
#[derive(Debug)]
51+
pub(crate) struct IcebergPersistenceSingleStats {
1752
mooncake_table_id: String,
1853
latency: Histogram<u64>,
1954
}
2055

21-
impl IcebergPersistenceStats {
22-
pub(crate) fn new(mooncake_table_id: String, stats_type: IcebergPersistenceStage) -> Self {
56+
impl IcebergPersistenceSingleStats {
57+
fn new(mooncake_table_id: String, stats_type: PersistenceStage) -> Self {
2358
let meter = global::meter("iceberg_persistence");
2459
let latency = match stats_type {
25-
IcebergPersistenceStage::Overall => meter
60+
PersistenceStage::Overall => meter
2661
.u64_histogram("snapshot_synchronization_latency")
2762
.with_description("Latency (ms) for snapshot synchronization")
2863
.with_boundaries(vec![50.0, 100.0, 200.0, 300.0, 400.0, 500.0])
2964
.build(),
30-
IcebergPersistenceStage::DataFiles => meter
65+
PersistenceStage::DataFiles => meter
3166
.u64_histogram("sync_data_files_latency")
3267
.with_description("Latency (ms) for data files synchronization")
3368
.with_boundaries(vec![50.0, 100.0, 200.0, 300.0, 400.0, 500.0])
3469
.build(),
35-
IcebergPersistenceStage::FileIndices => meter
70+
PersistenceStage::FileIndices => meter
3671
.u64_histogram("sync_file_indices_latency")
3772
.with_description("Latency (ms) for file indices synchronization")
3873
.with_boundaries(vec![50.0, 100.0, 200.0, 300.0, 400.0, 500.0])
3974
.build(),
40-
IcebergPersistenceStage::DeletionVectors => meter
75+
PersistenceStage::DeletionVectors => meter
4176
.u64_histogram("sync_deletion_vectors_latency")
4277
.with_description("Latency (ms) for deletion vectors synchronization")
4378
.with_boundaries(vec![50.0, 100.0, 200.0, 300.0, 400.0, 500.0])
4479
.build(),
45-
IcebergPersistenceStage::TransactionCommit => meter
80+
PersistenceStage::TransactionCommit => meter
4681
.u64_histogram("transaction_commit_latency")
4782
.with_description("Latency (ms) for transaction commit")
4883
.with_boundaries(vec![50.0, 100.0, 200.0, 300.0, 400.0, 500.0])
@@ -56,7 +91,7 @@ impl IcebergPersistenceStats {
5691
}
5792
}
5893

59-
impl BaseLatencyExporter for IcebergPersistenceStats {
94+
impl BaseLatencyExporter for IcebergPersistenceSingleStats {
6095
fn start<'a>(&'a self) -> LatencyGuard<'a> {
6196
LatencyGuard::new(self.mooncake_table_id.clone(), self)
6297
}

src/moonlink/src/storage/iceberg/iceberg_table_manager.rs

Lines changed: 5 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use crate::observability::iceberg_persistence::{IcebergPersistenceStage, IcebergPersistenceStats};
1+
use crate::observability::iceberg_persistence::IcebergPersistenceStats;
22
use crate::observability::iceberg_table_recovery::IcebergTableRecoveryStats;
33
use crate::observability::latency_exporter::BaseLatencyExporter;
44
use crate::storage::cache::object_storage::base_cache::CacheTrait;
@@ -74,20 +74,8 @@ pub struct IcebergTableManager {
7474
/// Maps from remote data file path to its file id.
7575
pub(crate) remote_data_file_to_file_id: HashMap<String, FileId>,
7676

77-
/// Iceberg persistence stats for overall snapshot synchronization.
78-
pub(crate) persistence_stats_overall: Arc<IcebergPersistenceStats>,
79-
80-
/// Iceberg persistence stats for data files synchronization.
81-
pub(crate) persistence_stats_sync_data_files: Arc<IcebergPersistenceStats>,
82-
83-
/// Iceberg persistence stats for deletion vectors synchronization.
84-
pub(crate) persistence_stats_sync_deletion_vectors: Arc<IcebergPersistenceStats>,
85-
86-
/// Iceberg persistence stats for file indices synchronization.
87-
pub(crate) persistence_stats_sync_file_indices: Arc<IcebergPersistenceStats>,
88-
89-
/// Iceberg persistence stats for transaction commit.
90-
pub(crate) persistence_stats_transaction_commit: Arc<IcebergPersistenceStats>,
77+
/// Iceberg persistence stats.
78+
pub(crate) persistence_stats: Arc<IcebergPersistenceStats>,
9179
}
9280

9381
impl IcebergTableManager {
@@ -112,26 +100,7 @@ impl IcebergTableManager {
112100
persisted_data_files: HashMap::new(),
113101
persisted_file_indices: HashMap::new(),
114102
remote_data_file_to_file_id: HashMap::new(),
115-
persistence_stats_overall: Arc::new(IcebergPersistenceStats::new(
116-
mooncake_table_id.clone(),
117-
IcebergPersistenceStage::Overall,
118-
)),
119-
persistence_stats_sync_data_files: Arc::new(IcebergPersistenceStats::new(
120-
mooncake_table_id.clone(),
121-
IcebergPersistenceStage::DataFiles,
122-
)),
123-
persistence_stats_sync_file_indices: Arc::new(IcebergPersistenceStats::new(
124-
mooncake_table_id.clone(),
125-
IcebergPersistenceStage::FileIndices,
126-
)),
127-
persistence_stats_sync_deletion_vectors: Arc::new(IcebergPersistenceStats::new(
128-
mooncake_table_id.clone(),
129-
IcebergPersistenceStage::DeletionVectors,
130-
)),
131-
persistence_stats_transaction_commit: Arc::new(IcebergPersistenceStats::new(
132-
mooncake_table_id,
133-
IcebergPersistenceStage::TransactionCommit,
134-
)),
103+
persistence_stats: Arc::new(IcebergPersistenceStats::new(mooncake_table_id)),
135104
})
136105
}
137106

@@ -160,26 +129,7 @@ impl IcebergTableManager {
160129
persisted_data_files: HashMap::new(),
161130
persisted_file_indices: HashMap::new(),
162131
remote_data_file_to_file_id: HashMap::new(),
163-
persistence_stats_overall: Arc::new(IcebergPersistenceStats::new(
164-
mooncake_table_id.clone(),
165-
IcebergPersistenceStage::Overall,
166-
)),
167-
persistence_stats_sync_data_files: Arc::new(IcebergPersistenceStats::new(
168-
mooncake_table_id.clone(),
169-
IcebergPersistenceStage::DataFiles,
170-
)),
171-
persistence_stats_sync_file_indices: Arc::new(IcebergPersistenceStats::new(
172-
mooncake_table_id.clone(),
173-
IcebergPersistenceStage::FileIndices,
174-
)),
175-
persistence_stats_sync_deletion_vectors: Arc::new(IcebergPersistenceStats::new(
176-
mooncake_table_id.clone(),
177-
IcebergPersistenceStage::DeletionVectors,
178-
)),
179-
persistence_stats_transaction_commit: Arc::new(IcebergPersistenceStats::new(
180-
mooncake_table_id,
181-
IcebergPersistenceStage::TransactionCommit,
182-
)),
132+
persistence_stats: Arc::new(IcebergPersistenceStats::new(mooncake_table_id)),
183133
})
184134
}
185135

src/moonlink/src/storage/iceberg/iceberg_table_syncer.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ impl IcebergTableManager {
242242
});
243243
}
244244
// Record data files synchronization latency.
245-
let _guard = self.persistence_stats_sync_data_files.start();
245+
let _guard = self.persistence_stats.sync_data_files.start();
246246
let mut local_data_files_to_remote = HashMap::with_capacity(new_data_files.len());
247247
let mut new_remote_data_files = Vec::with_capacity(new_data_files.len());
248248
let mut new_iceberg_data_files = Vec::with_capacity(new_data_files.len());
@@ -496,7 +496,7 @@ impl IcebergTableManager {
496496
evicted_files_to_delete: Vec::new(),
497497
});
498498
}
499-
let _guard = self.persistence_stats_sync_deletion_vectors.start();
499+
let _guard = self.persistence_stats.sync_deletion_vectors.start();
500500
let mut puffin_deletion_blobs = HashMap::with_capacity(new_deletion_logs.len());
501501
let mut evicted_files_to_delete = vec![];
502502
let prepared: Vec<IcebergResult<PreparedDeletionVectorBlob>>;
@@ -618,7 +618,7 @@ impl IcebergTableManager {
618618
return Ok(HashMap::new());
619619
}
620620
// Record file indices synchronization latency.
621-
let _guard = self.persistence_stats_sync_file_indices.start();
621+
let _guard = self.persistence_stats.sync_file_indices.start();
622622
let mut local_index_file_to_remote = HashMap::new();
623623

624624
let iceberg_table = self.iceberg_table.as_ref().unwrap();
@@ -726,8 +726,8 @@ impl IcebergTableManager {
726726
file_params: PersistenceFileParams,
727727
) -> Result<PersistenceResult> {
728728
// Start recording overall snapshot synchronization latency.
729-
let persistence_stats_overall = self.persistence_stats_overall.clone();
730-
let _guard = persistence_stats_overall.start();
729+
let persistence_stats = self.persistence_stats.clone();
730+
let _guard = persistence_stats.overall.start();
731731

732732
// Initialize iceberg table on access.
733733
self.initialize_iceberg_table_for_once().await?;
@@ -819,7 +819,7 @@ impl IcebergTableManager {
819819

820820
let updated_iceberg_table = {
821821
// Start recording transaction commit latency.
822-
let _guard = self.persistence_stats_transaction_commit.start();
822+
let _guard = self.persistence_stats.transaction_commit.start();
823823
// Commit the transaction.
824824
txn.commit(&*self.catalog).await?
825825
};

0 commit comments

Comments
 (0)