Skip to content

Commit 9a87436

Browse files
authored
databricks: include spill_to_disk_bytes into disk_write_bytes metric (#208)
1 parent 43c3b41 commit 9a87436

1 file changed

Lines changed: 42 additions & 19 deletions

File tree

  • system-adapters/databricks/src

system-adapters/databricks/src/main.rs

Lines changed: 42 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -928,8 +928,8 @@ impl DatabricksAdapter {
928928
}
929929
}
930930

931-
/// Sum `read_bytes` + `write_remote_bytes` from query history for all
932-
/// FINISHED queries on this warehouse since `start_time_ms`.
931+
/// Sum `read_bytes` and `write_remote_bytes + spill_to_disk_bytes` from query
932+
/// history for all FINISHED queries on this warehouse since `start_time_ms`.
933933
///
934934
/// Uses the REST Query History API (`/api/2.0/sql/history/queries`).
935935
///
@@ -942,15 +942,18 @@ impl DatabricksAdapter {
942942
}
943943

944944
/// REST path: paginate through `/api/2.0/sql/history/queries` and sum
945-
/// per-query `metrics.read_bytes` and `metrics.write_remote_bytes`.
945+
/// per-query `metrics.read_bytes` and `metrics.write_remote_bytes + spill_to_disk_bytes`.
946946
async fn sum_query_history_io_rest(&self, start_time_ms: u64) -> Result<(u64, u64)> {
947947
let history_url = format!(
948948
"https://{}/api/2.0/sql/history/queries",
949949
self.config.endpoint
950950
);
951951

952-
let mut total_read: u64 = 0;
953-
let mut total_write: u64 = 0;
952+
let mut total_read_bytes: u64 = 0;
953+
let mut total_read_remote_bytes: u64 = 0;
954+
let mut total_read_cache_bytes: u64 = 0;
955+
let mut total_write_remote_bytes: u64 = 0;
956+
let mut total_spill_to_disk_bytes: u64 = 0;
954957
let mut page_token: Option<String> = None;
955958

956959
loop {
@@ -988,8 +991,11 @@ impl DatabricksAdapter {
988991

989992
for entry in &body.res {
990993
if let Some(ref m) = entry.metrics {
991-
total_read += m.read_bytes.unwrap_or(0);
992-
total_write += m.write_remote_bytes.unwrap_or(0);
994+
total_read_bytes += m.read_bytes.unwrap_or(0);
995+
total_read_remote_bytes += m.read_remote_bytes.unwrap_or(0);
996+
total_read_cache_bytes += m.read_cache_bytes.unwrap_or(0);
997+
total_write_remote_bytes += m.write_remote_bytes.unwrap_or(0);
998+
total_spill_to_disk_bytes += m.spill_to_disk_bytes.unwrap_or(0);
993999
}
9941000
}
9951001

@@ -1000,7 +1006,15 @@ impl DatabricksAdapter {
10001006
}
10011007
}
10021008

1003-
Ok((total_read, total_write))
1009+
let total_write_bytes = total_write_remote_bytes + total_spill_to_disk_bytes;
1010+
1011+
eprintln!(
1012+
"[databricks-adapter] query history I/O breakdown: \
1013+
total_read_bytes={total_read_bytes} (total_read_remote_bytes={total_read_remote_bytes}, total_read_cache_bytes={total_read_cache_bytes}), \
1014+
total_write_bytes={total_write_bytes} (total_write_remote_bytes={total_write_remote_bytes}, total_spill_to_disk_bytes={total_spill_to_disk_bytes})"
1015+
);
1016+
1017+
Ok((total_read_bytes, total_write_bytes))
10041018
}
10051019

10061020
async fn ensure_cluster_ready(&self) -> Result<(String, bool)> {
@@ -1796,11 +1810,20 @@ struct QueryHistoryMetrics {
17961810
/// and local SSD/disk cache (`read_cache_bytes`). Maps to `disk_read_bytes`.
17971811
#[serde(default)]
17981812
read_bytes: Option<u64>,
1799-
/// Bytes written to remote cloud storage (S3/ADLS/GCS). This is the only write metric
1800-
/// available in the REST API — non-zero for DDL/DML that materializes data (e.g. CTAS).
1801-
/// Maps to `disk_write_bytes`.
1813+
/// Bytes read from remote cloud storage (S3/ADLS/GCS).
1814+
#[serde(default)]
1815+
read_remote_bytes: Option<u64>,
1816+
/// Bytes read from the local SSD/disk cache.
1817+
#[serde(default)]
1818+
read_cache_bytes: Option<u64>,
1819+
/// Bytes written to remote cloud storage (S3/ADLS/GCS). Non-zero for DDL/DML that
1820+
/// materializes data (e.g. CTAS). Summed with `spill_to_disk_bytes` for `disk_write_bytes`.
18021821
#[serde(default)]
18031822
write_remote_bytes: Option<u64>,
1823+
/// Bytes temporarily written to local disk when query execution exceeds available memory.
1824+
/// Summed with `write_remote_bytes` for `disk_write_bytes`.
1825+
#[serde(default)]
1826+
spill_to_disk_bytes: Option<u64>,
18041827
}
18051828

18061829
/// Response from GET /api/2.0/sql/history/queries
@@ -1925,11 +1948,10 @@ impl Handler for DatabricksAdapter {
19251948
// Create managed UC tables (sources for synced tables) via SQL Warehouse.
19261949
// Drop any stale tables from a previous failed run first to ensure idempotent setup.
19271950
for (table_name, dataset_cfg) in &datasets {
1928-
let drop_sql = format!(
1929-
"DROP TABLE IF EXISTS {}",
1930-
self.table_full_name(table_name)
1951+
let drop_sql = format!("DROP TABLE IF EXISTS {}", self.table_full_name(table_name));
1952+
eprintln!(
1953+
"[databricks-adapter] dropping stale managed table (if exists) '{table_name}': {drop_sql}"
19311954
);
1932-
eprintln!("[databricks-adapter] dropping stale managed table (if exists) '{table_name}': {drop_sql}");
19331955
self.execute_sql_statement(&drop_sql)
19341956
.await
19351957
.map_err(|e| format!("Failed to drop stale managed table '{table_name}': {e}"))?;
@@ -2257,13 +2279,14 @@ impl Handler for DatabricksAdapter {
22572279
}
22582280
}
22592281

2260-
// Sum read_bytes and write_bytes from all queries since the run started.
2261-
// On periodic scrapes this is best-effort (Query History has ~5 min lag).
2262-
// On final scrape the marker wait above ensures completeness.
2282+
// Sum read_bytes and (write_remote_bytes + spill_to_disk_bytes) from all
2283+
// queries since the run started. On periodic scrapes this is best-effort
2284+
// (Query History has ~5 min lag). On final scrape the marker wait above
2285+
// ensures completeness.
22632286
match self.sum_query_history_io(started_at_ms).await {
22642287
Ok((total_read, total_write)) => {
22652288
eprintln!(
2266-
"[databricks-adapter] query history totals: read_bytes={total_read} write_remote_bytes={total_write}"
2289+
"[databricks-adapter] query history totals: read_bytes={total_read} write_bytes={total_write}"
22672290
);
22682291
resource.disk_read_bytes = Some(total_read);
22692292
resource.disk_write_bytes = Some(total_write);

0 commit comments

Comments
 (0)