Skip to content

Commit dc25e81

Browse files
committed
Updates
1 parent 835f411 commit dc25e81

8 files changed

Lines changed: 88 additions & 141 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -136,6 +136,7 @@ version.workspace = true
136136
[dependencies]
137137
adbc_client = { path = "crates/adbc_client" }
138138
arrow.workspace = true
139+
arrow-schema.workspace = true
139140
async-trait.workspace = true
140141
clap.workspace = true
141142
data-generation = { path = "crates/data-generation" }

README.md

Lines changed: 8 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ flowchart TB
1717
direction TB
1818
1919
subgraph setup_phase["1 · Setup (JSON-RPC)"]
20-
adapter_iface["System Adapter Protocol\n(setup / create_tables /\nquery_method / teardown / metrics)"]
20+
adapter_iface["System Adapter Protocol\n(setup / create_tables /\nteardown / metrics)"]
2121
spice["Spice Cloud Adapter"]
2222
databricks["Databricks Adapter"]
2323
other["... Other Adapters"]
@@ -104,9 +104,8 @@ flowchart TB
104104
105105
orchestrator -->|"start run"| run
106106
107-
adapter_iface -->|"setup(run_id, datasets)"| sut
107+
adapter_iface -->|"setup(run_id, datasets)\n→ ADBC driver + kwargs"| executors
108108
adapter_iface -->|"create_tables(run_id)"| sut
109-
adapter_iface -->|"query_method(run_id)\n→ ADBC driver + kwargs"| executors
110109
setup_phase -->|"system ready"| bench_phase
111110
bench_phase -->|"benchmark complete"| teardown_phase
112111
@@ -123,7 +122,7 @@ A **Run** is a single end-to-end execution of the benchmark for one system. Each
123122

124123
| Phase | What happens | Timed? |
125124
| ------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ |
126-
| **1. Setup** | Connect to system adapter via JSON-RPC (stdio or HTTP). Call `setup(run_id, datasets)` to provision the SUT, then `create_tables(run_id)`, then `query_method(run_id)` to get ADBC driver config. | No |
125+
| **1. Setup** | Connect to system adapter via JSON-RPC (stdio or HTTP). Call `setup(run_id, datasets)` to provision the SUT and return ADBC driver config, then `create_tables(run_id)`. | No |
127126
| **2. Benchmark (timed)** | Three sequential stages — warm-up (1× query set), baseline (10% of duration, 60s–600s), and load test (full duration with concurrent clients). | Yes |
128127
| **3. Teardown** | Call `teardown(run_id)` via the adapter to deprovision resources and clean up. | No |
129128

@@ -148,7 +147,7 @@ Common CLI/workflow usage:
148147
| Component | Responsibility |
149148
| --------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
150149
| **GitHub Actions** | Orchestrates Runs on schedule, PR, or manual dispatch. Manages the full Run lifecycle across phases. |
151-
| **System Adapter Protocol** | JSON-RPC 2.0 interface (stdio or HTTP) for each platform. Methods: `setup`, `create_tables`, `query_method`, `teardown`, `metrics`. |
150+
| **System Adapter Protocol** | JSON-RPC 2.0 interface (stdio or HTTP) for each platform. Methods: `setup`, `create_tables`, `teardown`, `metrics`. |
152151
| **Query Executors** | Pluggable query execution: ADBC direct (FlightSQL/Databricks drivers), HTTP (`/v1/sql`), or distributed (`/v1/queries` with polling). |
153152
| **Data Generator** | Standalone binary (`data-generation`) that produces TPC-H partitioned Parquet batches and writes them to S3. |
154153
| **Test Framework** | Core engine managing the warm-up → baseline → load test pipeline, query sets (TPC-H, TPC-DS, ClickBench, parameterized, scenario), and statistics collection. |
@@ -218,9 +217,8 @@ To benchmark a new platform, implement the JSON-RPC 2.0 adapter with these metho
218217

219218
1. **`setup(run_id, datasets)`** — Provision infrastructure and configure the target system.
220219
2. **`create_tables(run_id)`** — Create/register destination tables for the benchmark datasets.
221-
3. **`query_method(run_id)`** — Return the ADBC driver type (`flightsql` or `databricks`) and connection kwargs so SpiceBench can establish a direct query connection.
222-
4. **`teardown(run_id)`** — Clean up provisioned resources.
223-
5. **`metrics(run_id)`** *(optional)* — Return current resource usage (CPU, memory, disk, IOPS) and ingestion progress (rows, bytes, rows/s, active connections).
220+
3. **`teardown(run_id)`** — Clean up provisioned resources.
221+
4. **`metrics(run_id)`** *(optional)* — Return current resource usage (CPU, memory, disk, IOPS) and ingestion progress (rows, bytes, rows/s, active connections).
224222

225223
The adapter can run as a **stdio** child process or as an **HTTP** server.
226224

@@ -247,9 +245,8 @@ For each run, SpiceBench calls adapter JSON-RPC methods in this order:
247245

248246
1. `setup(run_id, datasets, metadata)`
249247
2. `create_tables(run_id)`
250-
3. `query_method(run_id)`
251-
4. benchmark execution and optional periodic `metrics(run_id)` scraping
252-
5. `teardown(run_id)`
248+
3. benchmark execution and optional periodic `metrics(run_id)` scraping
249+
4. `teardown(run_id)`
253250

254251
Tiny `create_tables` request example:
255252

crates/system-adapter-protocol/src/client.rs

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -207,16 +207,6 @@ impl Client {
207207
.ok_or_else(|| ClientError::InvalidResponse("Missing result".to_string()))
208208
}
209209

210-
/// Get query method/driver information for a benchmark run
211-
pub async fn query_method(&mut self, run_id: uuid::Uuid) -> Result<crate::QueryMethodResponse> {
212-
let request = crate::QueryMethodRequest { run_id };
213-
let rpc_request = JsonRpcRequest::new(1, crate::methods::QUERY_METHOD, request);
214-
let response = self.call_typed(rpc_request).await?;
215-
response
216-
.result
217-
.ok_or_else(|| ClientError::InvalidResponse("Missing result".to_string()))
218-
}
219-
220210
/// Teardown a benchmark run
221211
pub async fn teardown(&mut self, run_id: uuid::Uuid) -> Result<crate::TeardownResponse> {
222212
let request = crate::TeardownRequest { run_id };

crates/system-adapter-protocol/src/lib.rs

Lines changed: 11 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ limitations under the License.
2222
//!
2323
//! # Features
2424
//!
25-
//! - **Protocol types**: Request/response types for setup, create_tables, query_method, and teardown
25+
//! - **Protocol types**: Request/response types for setup, create_tables, teardown, and metrics
2626
//! - **Client**: Ready-to-use client with Stdio and HTTP transports (requires `client` feature)
2727
//! - **Server**: Easy server implementation via Handler trait (requires `server` feature)
2828
//! - **JSON-RPC**: Standard JSON-RPC 2.0 envelope types
@@ -44,9 +44,7 @@ limitations under the License.
4444
//! let setup_response = client.setup(run_id, HashMap::new(), HashMap::new()).await?;
4545
//! let create_tables_response = client.create_tables(run_id).await?;
4646
//!
47-
//! // Get query method information
48-
//! let query_response = client.query_method(run_id).await?;
49-
//! println!("Driver: {:?}", query_response.driver);
47+
//! println!("Driver: {:?}", setup_response.driver);
5048
//!
5149
//! // Teardown the run
5250
//! let teardown_response = client.teardown(run_id).await?;
@@ -60,8 +58,8 @@ limitations under the License.
6058
//! # #[cfg(feature = "server")]
6159
//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
6260
//! use system_adapter_protocol::{
63-
//! AdbcDriver, CreateTablesResponse, DatasetConfig, Handler, QueryMethodResponse, Server,
64-
//! SetupResponse, TeardownResponse,
61+
//! AdbcDriver, CreateTablesResponse, DatasetConfig, Handler, Server, SetupResponse,
62+
//! TeardownResponse,
6563
//! };
6664
//! use async_trait::async_trait;
6765
//! use std::collections::HashMap;
@@ -79,11 +77,7 @@ limitations under the License.
7977
//! ) -> Result<SetupResponse, String> {
8078
//! // Your setup logic here
8179
//! let _ = metadata;
82-
//! Ok(SetupResponse { ok: true })
83-
//! }
84-
//!
85-
//! async fn query_method(&mut self, run_id: Uuid) -> Result<QueryMethodResponse, String> {
86-
//! Ok(QueryMethodResponse {
80+
//! Ok(SetupResponse {
8781
//! driver: AdbcDriver::Flightsql,
8882
//! db_kwargs: HashMap::new(),
8983
//! })
@@ -162,11 +156,13 @@ pub struct SetupRequest {
162156
pub metadata: HashMap<String, serde_json::Value>,
163157
}
164158

165-
/// Response from setup request
166-
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
159+
/// Response from setup request containing ADBC connection information
160+
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
167161
pub struct SetupResponse {
168-
/// Indicates if setup was successful
169-
pub ok: bool,
162+
/// ADBC driver to use for database connections
163+
pub driver: AdbcDriver,
164+
/// Driver-specific connection parameters
165+
pub db_kwargs: HashMap<String, serde_json::Value>,
170166
}
171167

172168
/// Request to create benchmark tables in the system under test.
@@ -185,24 +181,6 @@ pub struct CreateTablesResponse {
185181
pub ok: bool,
186182
}
187183

188-
/// Request to get query method/driver information
189-
///
190-
/// JSON-RPC method: `query_method`
191-
#[derive(Debug, Clone, Serialize, Deserialize)]
192-
pub struct QueryMethodRequest {
193-
/// Unique identifier for the benchmark run
194-
pub run_id: Uuid,
195-
}
196-
197-
/// Response containing database connection information
198-
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
199-
pub struct QueryMethodResponse {
200-
/// ADBC driver to use for database connections
201-
pub driver: AdbcDriver,
202-
/// Driver-specific connection parameters
203-
pub db_kwargs: HashMap<String, serde_json::Value>,
204-
}
205-
206184
/// Request to teardown a benchmark run
207185
///
208186
/// JSON-RPC method: `teardown`
@@ -377,7 +355,6 @@ pub mod error_codes {
377355
pub mod methods {
378356
pub const SETUP: &str = "setup";
379357
pub const CREATE_TABLES: &str = "create_tables";
380-
pub const QUERY_METHOD: &str = "query_method";
381358
pub const TEARDOWN: &str = "teardown";
382359
pub const METRICS: &str = "metrics";
383360
pub const RPC_METHODS: &str = "rpc.methods";

crates/system-adapter-protocol/src/server.rs

Lines changed: 5 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,8 @@ limitations under the License.
1818
1919
use crate::{
2020
CreateTablesRequest, CreateTablesResponse, DatasetConfig, JsonRpcError, JsonRpcResponse,
21-
MetricsRequest, MetricsResponse, QueryMethodRequest, QueryMethodResponse, SetupRequest,
22-
SetupResponse, TeardownRequest, TeardownResponse, error_codes, methods,
21+
MetricsRequest, MetricsResponse, SetupRequest, SetupResponse, TeardownRequest,
22+
TeardownResponse, error_codes, methods,
2323
};
2424
use async_trait::async_trait;
2525
use serde::de::DeserializeOwned;
@@ -68,7 +68,7 @@ pub type Result<T> = std::result::Result<T, ServerError>;
6868
/// Handler trait for implementing system adapter logic
6969
///
7070
/// Implement this trait to define how your system adapter handles
71-
/// setup, query_method, and teardown requests.
71+
/// setup, create_tables, and teardown requests.
7272
#[async_trait]
7373
pub trait Handler: Send + Sync {
7474
/// Setup a benchmark run
@@ -85,12 +85,6 @@ pub trait Handler: Send + Sync {
8585
run_id: Uuid,
8686
) -> std::result::Result<CreateTablesResponse, String>;
8787

88-
/// Get query method/driver information for a benchmark run
89-
async fn query_method(
90-
&mut self,
91-
run_id: Uuid,
92-
) -> std::result::Result<QueryMethodResponse, String>;
93-
9488
/// Teardown a benchmark run
9589
async fn teardown(&mut self, run_id: Uuid) -> std::result::Result<TeardownResponse, String>;
9690

@@ -111,7 +105,6 @@ pub trait Handler: Send + Sync {
111105
vec![
112106
methods::SETUP.to_string(),
113107
methods::CREATE_TABLES.to_string(),
114-
methods::QUERY_METHOD.to_string(),
115108
methods::TEARDOWN.to_string(),
116109
methods::METRICS.to_string(),
117110
methods::RPC_METHODS.to_string(),
@@ -189,7 +182,6 @@ impl<H: Handler> Server<H> {
189182
let result = match method {
190183
methods::SETUP => self.handle_setup(&request, id.clone()).await,
191184
methods::CREATE_TABLES => self.handle_create_tables(&request, id.clone()).await,
192-
methods::QUERY_METHOD => self.handle_query_method(&request, id.clone()).await,
193185
methods::TEARDOWN => self.handle_teardown(&request, id.clone()).await,
194186
methods::METRICS => self.handle_metrics(&request, id.clone()).await,
195187
methods::RPC_METHODS => self.handle_rpc_methods(id.clone()).await,
@@ -258,18 +250,6 @@ impl<H: Handler> Server<H> {
258250
)
259251
}
260252

261-
async fn handle_query_method(
262-
&mut self,
263-
request: &serde_json::Value,
264-
id: serde_json::Value,
265-
) -> serde_json::Value {
266-
let req: QueryMethodRequest = match Self::parse_params(request, &id) {
267-
Ok(r) => r,
268-
Err(e) => return e,
269-
};
270-
Self::handler_response(self.handler.query_method(req.run_id).await, id)
271-
}
272-
273253
async fn handle_create_tables(
274254
&mut self,
275255
request: &serde_json::Value,
@@ -327,14 +307,7 @@ mod tests {
327307
_datasets: HashMap<String, DatasetConfig>,
328308
_metadata: HashMap<String, serde_json::Value>,
329309
) -> std::result::Result<SetupResponse, String> {
330-
Ok(SetupResponse { ok: true })
331-
}
332-
333-
async fn query_method(
334-
&mut self,
335-
_run_id: Uuid,
336-
) -> std::result::Result<QueryMethodResponse, String> {
337-
Ok(QueryMethodResponse {
310+
Ok(SetupResponse {
338311
driver: crate::AdbcDriver::Flightsql,
339312
db_kwargs: HashMap::new(),
340313
})
@@ -366,7 +339,7 @@ mod tests {
366339
let response = server.handle_request(request).await;
367340

368341
assert!(response.get("result").is_some());
369-
assert_eq!(response["result"]["ok"], true);
342+
assert_eq!(response["result"]["driver"], "flightsql");
370343
}
371344

372345
#[tokio::test]

src/main.rs

Lines changed: 46 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,13 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
use std::sync::Arc;
17+
use std::{collections::HashMap, sync::Arc};
1818

1919
use adbc_client::AdbcConnection;
20+
use arrow_schema::{DataType, Field, Schema, TimeUnit};
2021
use clap::Parser;
2122
use data_generation::config::{DatasetConfig as GenerationDatasetConfig, TargetConfig};
23+
use data_generation::dataset::Dataset;
2224
use data_generation::dataset::MutationConfig;
2325
use data_generation::storage::s3::S3Storage;
2426
use etl::sink::adbc::AdbcSink;
@@ -35,6 +37,28 @@ mod scenario;
3537
use crate::commands::connect_system_adapter;
3638
use crate::scenario::Scenario;
3739

40+
fn setup_request_datasets(dataset: &Arc<dyn Dataset>) -> HashMap<String, system_adapter_protocol::DatasetConfig> {
41+
dataset
42+
.tables()
43+
.into_iter()
44+
.map(|(name, table)| {
45+
let mut fields: Vec<_> = table.schema.fields().iter().cloned().collect();
46+
fields.push(Arc::new(Field::new(
47+
"__created_at",
48+
DataType::Timestamp(TimeUnit::Microsecond, Some("UTC".into())),
49+
true,
50+
)));
51+
52+
(
53+
name,
54+
system_adapter_protocol::DatasetConfig {
55+
schema: Arc::new(Schema::new(fields)),
56+
},
57+
)
58+
})
59+
.collect()
60+
}
61+
3862
#[derive(Parser)]
3963
#[command(author, version, about, long_about = None)]
4064
struct Cli {
@@ -93,12 +117,29 @@ async fn main() -> anyhow::Result<()> {
93117
};
94118

95119
let run_id = uuid::Uuid::new_v4();
120+
let mutations = MutationConfig::new(0.1, 0.1);
96121

97-
// --- Query method from system adapter ---
98-
let adbc_driver = match system_adapter_client.query_method(run_id).await {
99-
Ok(method) => method,
122+
let setup_dataset = dataset_source.create(&generation_config, &mutations)?;
123+
let datasets = setup_request_datasets(&setup_dataset);
124+
125+
let setup_metadata = std::collections::HashMap::from([
126+
(
127+
"executor_instance_type".to_string(),
128+
serde_json::Value::String(cli.common.executor_instance_type.clone()),
129+
),
130+
(
131+
"table_format".to_string(),
132+
serde_json::Value::String(cli.common.table_format.to_string()),
133+
),
134+
]);
135+
136+
let adbc_driver = match system_adapter_client
137+
.setup(run_id, datasets, setup_metadata)
138+
.await
139+
{
140+
Ok(response) => response,
100141
Err(e) => {
101-
return Err(anyhow::anyhow!("Failed to query system adapter: {e}"));
142+
return Err(anyhow::anyhow!("Failed to setup system adapter: {e}"));
102143
}
103144
};
104145

@@ -130,8 +171,6 @@ async fn main() -> anyhow::Result<()> {
130171
));
131172
};
132173

133-
let mutations = MutationConfig::new(0.1, 0.1);
134-
135174
let target = Arc::new(AdbcSink::new_without_table_creation(adbc_conn, None));
136175
let mut pipeline = ETLPipeline::new(
137176
dataset_source,
@@ -141,26 +180,6 @@ async fn main() -> anyhow::Result<()> {
141180
&mutations,
142181
)?;
143182

144-
let datasets = pipeline.setup_request_datasets();
145-
let setup_metadata = std::collections::HashMap::from([
146-
(
147-
"executor_instance_type".to_string(),
148-
serde_json::Value::String(cli.common.executor_instance_type.clone()),
149-
),
150-
(
151-
"table_format".to_string(),
152-
serde_json::Value::String(cli.common.table_format.to_string()),
153-
),
154-
]);
155-
156-
if let Err(e) = system_adapter_client
157-
.setup(run_id, datasets, setup_metadata)
158-
.await
159-
{
160-
pipeline.cancel();
161-
return Err(anyhow::anyhow!("Failed to setup system adapter: {e}"));
162-
}
163-
164183
if let Err(e) = system_adapter_client.create_tables(run_id).await {
165184
pipeline.cancel();
166185
return Err(anyhow::anyhow!(

0 commit comments

Comments
 (0)