Skip to content

Commit 523a2fd

Browse files
feat: System adapter SetupResponse supports specifying catalog/namespace path (#137)
* feat: System adapter SetupResponse supports specifying catalog/namespace path * chore: auto-fix cargo fmt + clippy * Update lib.rs * feat: Add ETL sink type to system adapter setup request --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
1 parent 956911c commit 523a2fd

8 files changed

Lines changed: 127 additions & 14 deletions

File tree

crates/system-adapter-protocol/src/client.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -181,11 +181,13 @@ impl Client {
181181
run_id: uuid::Uuid,
182182
metadata: std::collections::HashMap<String, serde_json::Value>,
183183
datasets: std::collections::HashMap<String, crate::DatasetConfig>,
184+
etl_sink_type: Option<crate::EtlSinkType>,
184185
) -> Result<crate::SetupResponse> {
185186
let request = crate::SetupRequest {
186187
run_id,
187188
metadata,
188189
datasets,
190+
etl_sink_type,
189191
};
190192
let rpc_request = JsonRpcRequest::new(1, crate::methods::SETUP, request);
191193
let response = self.call_typed(rpc_request).await?;

crates/system-adapter-protocol/src/lib.rs

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,9 @@ limitations under the License.
4141
//!
4242
//! // Setup a benchmark run
4343
//! let run_id = Uuid::new_v4();
44-
//! let setup_response = client.setup(run_id, HashMap::new(), HashMap::new()).await?;
44+
//! let setup_response = client
45+
//! .setup(run_id, HashMap::new(), HashMap::new(), None)
46+
//! .await?;
4547
//!
4648
//! println!("Driver: {:?}", setup_response.driver);
4749
//!
@@ -57,7 +59,7 @@ limitations under the License.
5759
//! # #[cfg(feature = "server")]
5860
//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
5961
//! use system_adapter_protocol::{
60-
//! AdbcDriver, DatasetConfig, Handler, Server, SetupResponse, TeardownResponse,
62+
//! AdbcDriver, DatasetConfig, EtlSinkType, Handler, Server, SetupResponse, TeardownResponse,
6163
//! };
6264
//! use async_trait::async_trait;
6365
//! use std::collections::HashMap;
@@ -72,11 +74,13 @@ limitations under the License.
7274
//! run_id: Uuid,
7375
//! metadata: HashMap<String, serde_json::Value>,
7476
//! datasets: HashMap<String, DatasetConfig>,
77+
//! etl_sink_type: Option<EtlSinkType>,
7578
//! ) -> Result<SetupResponse, String> {
76-
//! let _ = (metadata, datasets);
79+
//! let _ = (metadata, datasets, etl_sink_type);
7780
//! Ok(SetupResponse {
7881
//! driver: AdbcDriver::Flightsql,
7982
//! db_kwargs: HashMap::new(),
83+
//! catalog_namespace: None,
8084
//! })
8185
//! }
8286
//!
@@ -121,6 +125,17 @@ pub enum AdbcDriver {
121125
Postgresql,
122126
}
123127

128+
/// ETL sink type used by spicebench for this run.
129+
///
130+
/// This is provided to adapters in [`SetupRequest`] so they can optionally
131+
/// adjust setup behavior based on how data is loaded.
132+
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
133+
#[serde(rename_all = "snake_case")]
134+
pub enum EtlSinkType {
135+
Hive,
136+
Adbc,
137+
}
138+
124139
impl std::fmt::Display for AdbcDriver {
125140
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
126141
match self {
@@ -160,6 +175,9 @@ pub struct SetupRequest {
160175
pub metadata: HashMap<String, serde_json::Value>,
161176
/// Map of dataset name to dataset definition
162177
pub datasets: HashMap<String, DatasetConfig>,
178+
/// Optional ETL sink type selected by spicebench.
179+
#[serde(skip_serializing_if = "Option::is_none")]
180+
pub etl_sink_type: Option<EtlSinkType>,
163181
}
164182

165183
/// Response from setup request containing ADBC connection information
@@ -169,6 +187,10 @@ pub struct SetupResponse {
169187
pub driver: AdbcDriver,
170188
/// Driver-specific connection parameters
171189
pub db_kwargs: HashMap<String, serde_json::Value>,
190+
/// Optional catalog/namespace path where benchmark tables were created
191+
/// (e.g. "catalog.schema").
192+
#[serde(skip_serializing_if = "Option::is_none")]
193+
pub catalog_namespace: Option<String>,
172194
}
173195
/// Request to teardown a benchmark run
174196
///

crates/system-adapter-protocol/src/server.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,8 @@ limitations under the License.
1717
//! Server implementations for system adapter JSON-RPC protocol.
1818
1919
use crate::{
20-
DatasetConfig, JsonRpcError, JsonRpcResponse, MetricsRequest, MetricsResponse, SetupRequest,
21-
SetupResponse, TeardownRequest, TeardownResponse, error_codes, methods,
20+
DatasetConfig, EtlSinkType, JsonRpcError, JsonRpcResponse, MetricsRequest, MetricsResponse,
21+
SetupRequest, SetupResponse, TeardownRequest, TeardownResponse, error_codes, methods,
2222
};
2323
use async_trait::async_trait;
2424
use serde::de::DeserializeOwned;
@@ -76,6 +76,7 @@ pub trait Handler: Send + Sync {
7676
run_id: Uuid,
7777
metadata: HashMap<String, serde_json::Value>,
7878
datasets: HashMap<String, DatasetConfig>,
79+
etl_sink_type: Option<EtlSinkType>,
7980
) -> std::result::Result<SetupResponse, String>;
8081

8182
/// Teardown a benchmark run
@@ -235,7 +236,7 @@ impl<H: Handler> Server<H> {
235236
};
236237
Self::handler_response(
237238
self.handler
238-
.setup(req.run_id, req.metadata, req.datasets)
239+
.setup(req.run_id, req.metadata, req.datasets, req.etl_sink_type)
239240
.await,
240241
id,
241242
)
@@ -285,10 +286,12 @@ mod tests {
285286
_run_id: Uuid,
286287
_metadata: HashMap<String, serde_json::Value>,
287288
_datasets: HashMap<String, DatasetConfig>,
289+
_etl_sink_type: Option<EtlSinkType>,
288290
) -> std::result::Result<SetupResponse, String> {
289291
Ok(SetupResponse {
290292
driver: crate::AdbcDriver::Flightsql,
291293
db_kwargs: HashMap::new(),
294+
catalog_namespace: None,
292295
})
293296
}
294297

crates/test-framework/src/queries/mod.rs

Lines changed: 30 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,10 @@ impl Query {
225225
/// becomes:
226226
/// `SELECT * FROM arrow.customer WHERE c_custkey = 1`
227227
///
228+
/// Multi-part namespaces are also supported. For example, if `reference_schema`
229+
/// is `catalog.schema`, the query becomes:
230+
/// `SELECT * FROM catalog.schema.customer WHERE c_custkey = 1`
231+
///
228232
/// Uses `DataFusion`'s SQL parser to parse the query, rewrite all table references,
229233
/// and unparse back to SQL. This works with any valid SQL query.
230234
///
@@ -237,6 +241,12 @@ impl Query {
237241
use sqlparser::parser::Parser;
238242
use std::ops::ControlFlow;
239243

244+
let namespace_parts: Vec<ObjectNamePart> = reference_schema
245+
.split('.')
246+
.filter(|part| !part.is_empty())
247+
.map(|part| ObjectNamePart::Identifier(Ident::new(part)))
248+
.collect();
249+
240250
// Parse the SQL query using sqlparser
241251
let dialect = sqlparser::dialect::PostgreSqlDialect {};
242252
let mut statements = Parser::parse_sql(&dialect, &self.sql).map_err(|e| {
@@ -262,10 +272,9 @@ impl Query {
262272
let _ = visit_relations_mut(statement, |table_name| {
263273
// Only rewrite if the table doesn't already have a schema prefix (single-part name)
264274
if table_name.0.len() == 1 {
265-
// Prepend the reference schema to the table name
266-
table_name
267-
.0
268-
.insert(0, ObjectNamePart::Identifier(Ident::new(reference_schema)));
275+
for namespace_part in namespace_parts.iter().rev() {
276+
table_name.0.insert(0, namespace_part.clone());
277+
}
269278
}
270279
ControlFlow::<()>::Continue(())
271280
});
@@ -992,6 +1001,23 @@ mod tests {
9921001
);
9931002
}
9941003

1004+
#[test]
1005+
fn test_rewrite_with_multi_part_namespace() {
1006+
let query = Query::new(
1007+
"test_multi_part".into(),
1008+
"SELECT * FROM customer JOIN orders ON customer.c_custkey = orders.o_custkey".into(),
1009+
false,
1010+
);
1011+
1012+
let rewritten = query
1013+
.rewrite_with_reference_schema("main.catalog")
1014+
.expect("Failed to rewrite query with multi-part namespace");
1015+
1016+
let sql = rewritten.sql.as_ref();
1017+
assert!(sql.contains("main.catalog.customer"));
1018+
assert!(sql.contains("main.catalog.orders"));
1019+
}
1020+
9951021
#[test]
9961022
fn test_rewrite_with_existing_schema_prefix() {
9971023
let query = Query::new(

src/commands/load/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -331,6 +331,7 @@ pub(crate) async fn run(
331331
etl_pipeline: &mut ETLPipeline,
332332
checkpoint_steps: Option<usize>,
333333
checkpoint_dir: Option<&Path>,
334+
query_catalog_namespace: Option<String>,
334335
) -> anyhow::Result<()> {
335336
let metric_attributes = run_metric_attributes(common_args);
336337

@@ -419,8 +420,12 @@ pub(crate) async fn run(
419420
let has_checkpoint_validation =
420421
common_args.validate_results && checkpoint_steps.is_some() && checkpoint_dir.is_some();
421422

422-
let (query_set, test_builder) =
423-
super::build_test_with_validation(scenario, test_builder).await?;
423+
let (query_set, test_builder) = super::build_test_with_validation(
424+
scenario,
425+
test_builder,
426+
query_catalog_namespace.as_deref(),
427+
)
428+
.await?;
424429

425430
// Build ordered query names for mapping checkpoint query_idx → query name.
426431
let queries = query_set.get_queries(None, None, None).await?;

src/commands/mod.rs

Lines changed: 44 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,22 @@ pub(crate) fn create_telemetry_with_resource(common: &CommonArgs, resource: Reso
5050
Telemetry::new_with_resource(&resource, "SPICEAI_BENCHMARK_METRICS_KEY")
5151
}
5252

53+
fn rewrite_queries_with_catalog_namespace(
54+
queries: Vec<test_framework::queries::Query>,
55+
query_catalog_namespace: Option<&str>,
56+
) -> anyhow::Result<Vec<test_framework::queries::Query>> {
57+
if let Some(catalog_namespace) = query_catalog_namespace
58+
&& !catalog_namespace.trim().is_empty()
59+
{
60+
return queries
61+
.into_iter()
62+
.map(|query| query.rewrite_with_reference_schema(catalog_namespace))
63+
.collect::<anyhow::Result<Vec<_>>>();
64+
}
65+
66+
Ok(queries)
67+
}
68+
5369
/// Build a test configuration with validation data if applicable
5470
///
5571
/// This is a common helper for bench, throughput, and load tests that:
@@ -64,9 +80,13 @@ pub(crate) fn create_telemetry_with_resource(common: &CommonArgs, resource: Reso
6480
pub(crate) async fn build_test_with_validation(
6581
scenario: &Scenario,
6682
test_builder: NotStarted,
83+
query_catalog_namespace: Option<&str>,
6784
) -> anyhow::Result<(QuerySet, NotStarted)> {
6885
let query_set = scenario.load_query_set()?;
69-
let queries = query_set.get_queries(None, None, None).await?;
86+
let queries = rewrite_queries_with_catalog_namespace(
87+
query_set.get_queries(None, None, None).await?,
88+
query_catalog_namespace,
89+
)?;
7090

7191
let test_builder = test_builder
7292
.with_query_set(queries)
@@ -117,3 +137,26 @@ macro_rules! wait_test_and_memory {
117137
}
118138
};
119139
}
140+
141+
#[cfg(test)]
142+
mod tests {
143+
use super::*;
144+
145+
#[test]
146+
fn test_rewrite_queries_with_catalog_namespace() {
147+
let queries = vec![test_framework::queries::Query::new(
148+
"q1".into(),
149+
"SELECT * FROM customer".into(),
150+
false,
151+
)];
152+
153+
let rewritten = rewrite_queries_with_catalog_namespace(queries, Some("catalog.schema"))
154+
.expect("rewrite should succeed");
155+
156+
assert_eq!(rewritten.len(), 1);
157+
assert_eq!(
158+
rewritten[0].sql.as_ref(),
159+
"SELECT * FROM catalog.schema.customer"
160+
);
161+
}
162+
}

src/main.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ async fn run_benchmark(
127127
let data_source: Arc<dyn DataStorage> = source.clone();
128128

129129
let mut setup_response_for_run: Option<system_adapter_protocol::SetupResponse> = None;
130+
let etl_sink_type = match common.etl_sink {
131+
EtlSink::Hive => system_adapter_protocol::EtlSinkType::Hive,
132+
EtlSink::Adbc => system_adapter_protocol::EtlSinkType::Adbc,
133+
};
130134

131135
let (target, target_config, target_kind, adbc_sink): (
132136
Arc<dyn Sink>,
@@ -177,6 +181,7 @@ async fn run_benchmark(
177181
run_id,
178182
setup_metadata.clone(),
179183
setup_pipeline.create_tables_request_datasets(),
184+
Some(etl_sink_type),
180185
)
181186
.await
182187
.map_err(|e| anyhow::anyhow!("Failed to setup system adapter: {e}"))?;
@@ -245,12 +250,14 @@ async fn run_benchmark(
245250
run_id,
246251
setup_metadata,
247252
pipeline.create_tables_request_datasets(),
253+
Some(etl_sink_type),
248254
)
249255
.await
250256
.map_err(|e| anyhow::anyhow!("Failed to setup system adapter: {e}"))?
251257
};
252258

253259
let driver_name = setup_response.driver.to_string();
260+
let query_catalog_namespace = setup_response.catalog_namespace.clone();
254261
let db_kwargs = setup_response.db_kwargs;
255262

256263
let load_conn = match AdbcConnection::create(&driver_name, db_kwargs) {
@@ -273,6 +280,7 @@ async fn run_benchmark(
273280
&mut pipeline,
274281
checkpoint_steps,
275282
Some(checkpoint_dir.path()),
283+
query_catalog_namespace,
276284
)
277285
.await?;
278286

system-adapters/databricks/src/main.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ use reqwest::StatusCode;
2525
use serde::{Deserialize, Serialize};
2626
use serde_json::{Value, json};
2727
use system_adapter_protocol::{
28-
AdbcDriver, DatasetConfig, Handler, Server, SetupResponse, TeardownResponse,
28+
AdbcDriver, DatasetConfig, EtlSinkType, Handler, Server, SetupResponse, TeardownResponse,
2929
};
3030
use uuid::Uuid;
3131

@@ -1707,7 +1707,9 @@ impl Handler for DatabricksAdapter {
17071707
run_id: Uuid,
17081708
metadata: HashMap<String, Value>,
17091709
datasets: HashMap<String, DatasetConfig>,
1710+
etl_sink_type: Option<EtlSinkType>,
17101711
) -> std::result::Result<SetupResponse, String> {
1712+
let _ = etl_sink_type;
17111713
eprintln!("[databricks-adapter] setup: run_id={run_id}");
17121714
eprintln!("[databricks-adapter] endpoint={}", self.config.endpoint);
17131715

@@ -1838,6 +1840,7 @@ impl Handler for DatabricksAdapter {
18381840
Ok(SetupResponse {
18391841
driver: AdbcDriver::Postgresql,
18401842
db_kwargs: HashMap::from([("uri".to_string(), Value::String(pg_uri))]),
1843+
catalog_namespace: Some(format!("{}.{}", self.config.catalog, self.config.schema)),
18411844
})
18421845
}
18431846
_ => Ok(SetupResponse {
@@ -1846,6 +1849,7 @@ impl Handler for DatabricksAdapter {
18461849
"uri".to_string(),
18471850
Value::String(self.databricks_uri()),
18481851
)]),
1852+
catalog_namespace: Some(format!("{}.{}", self.config.catalog, self.config.schema)),
18491853
}),
18501854
}
18511855
}

0 commit comments

Comments
 (0)