Skip to content

Commit b26c7fc

Browse files
sgrebnovpeasee
andauthored
Add SUT metrics for Databricks SQL Warehouse: disk I/O, num_active_sessions, compute nodes count (#180)
* Databricks: resolve UTF8 opaque types to fix schema mismatch * Add Databricks `num_active_sessions` metric * Enable scrape_sut_metrics * Add num_compute_nodes * Update * Update * disk_read_bytes and disk_write_bytes metrics * Update --------- Co-authored-by: William <98815791+peasee@users.noreply.github.com>
1 parent ee3564c commit b26c7fc

7 files changed

Lines changed: 441 additions & 10 deletions

File tree

.github/workflows/run_spicebench.yml

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,11 @@ on:
5353
required: false
5454
default: false
5555
type: boolean
56+
scrape_sut_metrics:
57+
description: 'Enable periodic SUT metrics collection via the system adapter (maps to spicebench --scrape-sut-metrics)'
58+
required: false
59+
default: true
60+
type: boolean
5661
scheduler_state_location:
5762
description: 'S3 URI for shared scheduler state (e.g. s3://bucket/scheduler-state/). If empty, scheduler state is not configured.'
5863
required: false
@@ -248,6 +253,7 @@ jobs:
248253
SCHEDULER_STATE_LOCATION: ${{ github.event.inputs.scheduler_state_location || '' }}
249254
VALIDATE_CHECKPOINT_RESULTS: ${{ github.event.inputs.validate_checkpoint_results || 'false' }}
250255
ENABLE_MODULE_DEBUG_LOGGING: ${{ github.event.inputs.enable_module_debug_logging || 'false' }}
256+
SCRAPE_SUT_METRICS: ${{ github.event.inputs.scrape_sut_metrics || 'true' }}
251257
SPICEAI_BENCHMARK_METRICS_KEY: ${{ secrets.SPICEAI_BENCHMARK_METRICS_KEY }}
252258
MINIO_ENDPOINT: ${{ secrets.MINIO_ENDPOINT }}
253259
AWS_ACCESS_KEY_ID: ${{ secrets.MINIO_ACCESS_KEY_ID }}
@@ -300,6 +306,11 @@ jobs:
300306
SCHEDULER_ARGS="--scheduler-state-location ${SCHEDULER_STATE_LOCATION}"
301307
fi
302308
309+
SUT_METRICS_ARGS=""
310+
if [ "${SCRAPE_SUT_METRICS}" = "true" ]; then
311+
SUT_METRICS_ARGS="--scrape-sut-metrics"
312+
fi
313+
303314
if [ "${SYSTEM_UNDER_TEST_PREFIX}" = "databricks" ]; then
304315
ADAPTER_CMD="${HOME}/.spice/bin/databricks-system-adapter"
305316
ADAPTER_ARGS="stdio"
@@ -335,11 +346,11 @@ jobs:
335346
--concurrency "${NUM_QUERY_CLIENTS}" \
336347
--scenario "${SCENARIO}" \
337348
--executor-instance-type "${EXECUTOR_INSTANCE_TYPE}" \
338-
--scrape-sut-metrics \
339349
${ETL_ARGS} \
340350
${ETL_SINK_ARGS} \
341351
${VALIDATION_ARGS} \
342352
${SCHEDULER_ARGS} \
353+
${SUT_METRICS_ARGS} \
343354
--system-adapter-stdio-cmd "${ADAPTER_CMD}" \
344355
--system-adapter-stdio-args "${ADAPTER_ARGS}" \
345356
${ADAPTER_ENVS} \

crates/system-adapter-protocol/src/client.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -212,8 +212,15 @@ impl Client {
212212
}
213213

214214
/// Collect current metrics from the system under test
215-
pub async fn metrics(&mut self, run_id: uuid::Uuid) -> Result<crate::MetricsResponse> {
216-
let request = crate::MetricsRequest { run_id };
215+
pub async fn metrics(
216+
&mut self,
217+
run_id: uuid::Uuid,
218+
final_scrape: bool,
219+
) -> Result<crate::MetricsResponse> {
220+
let request = crate::MetricsRequest {
221+
run_id,
222+
final_scrape,
223+
};
217224
let rpc_request = JsonRpcRequest::new(1, crate::methods::METRICS, request);
218225
let response = self.call_typed(rpc_request).await?;
219226
response

crates/system-adapter-protocol/src/lib.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -217,6 +217,11 @@ pub struct TeardownResponse {
217217
pub struct MetricsRequest {
218218
/// Unique identifier for the benchmark run
219219
pub run_id: Uuid,
220+
/// When true, this is the final metrics collection after the benchmark run
221+
/// has finished. Adapters may perform heavier queries (e.g. Query History)
222+
/// that would be too slow for periodic scraping.
223+
#[serde(default)]
224+
pub final_scrape: bool,
220225
}
221226

222227
/// Resource utilization snapshot from the system under test
@@ -240,6 +245,9 @@ pub struct ResourceMetrics {
240245
/// Cumulative disk write operations
241246
#[serde(skip_serializing_if = "Option::is_none")]
242247
pub disk_write_iops: Option<u64>,
248+
/// Number of active compute nodes / clusters backing the SUT
249+
#[serde(skip_serializing_if = "Option::is_none")]
250+
pub num_compute_nodes: Option<u64>,
243251
}
244252

245253
/// Ingestion progress snapshot from the system under test

crates/system-adapter-protocol/src/server.rs

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -86,9 +86,15 @@ pub trait Handler: Send + Sync {
8686
///
8787
/// Called periodically by spicebench when `--scrape-sut-metrics` is enabled.
8888
/// Returns a snapshot of resource utilization and ingestion progress.
89+
/// When `final_scrape` is true, the benchmark run has finished and the
90+
/// adapter may perform heavier queries (e.g. Query History aggregation).
8991
/// Default implementation returns empty metrics.
90-
async fn metrics(&mut self, run_id: Uuid) -> std::result::Result<MetricsResponse, String> {
91-
let _ = run_id;
92+
async fn metrics(
93+
&mut self,
94+
run_id: Uuid,
95+
final_scrape: bool,
96+
) -> std::result::Result<MetricsResponse, String> {
97+
let _ = (run_id, final_scrape);
9298
Ok(MetricsResponse::default())
9399
}
94100

@@ -263,7 +269,7 @@ impl<H: Handler> Server<H> {
263269
Ok(r) => r,
264270
Err(e) => return e,
265271
};
266-
Self::handler_response(self.handler.metrics(req.run_id).await, id)
272+
Self::handler_response(self.handler.metrics(req.run_id, req.final_scrape).await, id)
267273
}
268274

269275
async fn handle_rpc_methods(&mut self, id: serde_json::Value) -> serde_json::Value {
@@ -302,7 +308,11 @@ mod tests {
302308
Ok(TeardownResponse { ok: true })
303309
}
304310

305-
async fn metrics(&mut self, _run_id: Uuid) -> std::result::Result<MetricsResponse, String> {
311+
async fn metrics(
312+
&mut self,
313+
_run_id: Uuid,
314+
_final_scrape: bool,
315+
) -> std::result::Result<MetricsResponse, String> {
306316
Ok(MetricsResponse::default())
307317
}
308318
}

src/commands/load/mod.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,9 @@ fn record_sut_metrics(
153153
if let Some(v) = response.ingestion.active_connections {
154154
crate::metrics::ACTIVE_CONNECTIONS.record(v, attributes);
155155
}
156+
if let Some(v) = response.resource.num_compute_nodes {
157+
crate::metrics::NUM_COMPUTE_NODES.record(v, attributes);
158+
}
156159
}
157160

158161
/// Spawn a task that periodically scrapes SUT metrics from the system adapter.
@@ -180,7 +183,7 @@ fn spawn_sut_metrics_scraper(
180183
loop {
181184
tokio::select! {
182185
_ = ticker.tick() => {
183-
let metrics_result = adapter.lock().await.metrics(run_id).await;
186+
let metrics_result = adapter.lock().await.metrics(run_id, false).await;
184187
match metrics_result {
185188
Ok(resp) => {
186189
log_sut_metrics_snapshot(&resp);
@@ -205,7 +208,7 @@ fn spawn_sut_metrics_scraper(
205208
}
206209
() = token.cancelled() => {
207210
// Final scrape before exiting
208-
if let Ok(resp) = adapter.lock().await.metrics(run_id).await {
211+
if let Ok(resp) = adapter.lock().await.metrics(run_id, true).await {
209212
log_sut_metrics_snapshot(&resp);
210213
record_sut_metrics(
211214
&resp,

src/metrics.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,14 @@ pub static ACTIVE_CONNECTIONS: LazyLock<Gauge<u64>> = LazyLock::new(|| {
141141
.build()
142142
});
143143

144+
pub static NUM_COMPUTE_NODES: LazyLock<Gauge<u64>> = LazyLock::new(|| {
145+
meter()
146+
.u64_gauge("num_compute_nodes")
147+
.with_description("Number of active compute nodes / clusters backing the SUT.")
148+
.with_unit("nodes")
149+
.build()
150+
});
151+
144152
// --- Efficiency ---
145153

146154
pub static EFFICIENCY_QUERIES_PER_CORE: LazyLock<Gauge<f64>> = LazyLock::new(|| {

0 commit comments

Comments
 (0)