Skip to content

Commit d027745

Browse files
committed
Merge origin/trunk into lukim/query-method
2 parents f72a4e8 + e6b6c30 commit d027745

22 files changed

Lines changed: 903 additions & 179 deletions

File tree

.claude/skills/system-adapter-builder/SKILL.md

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
---
22
name: system-adapter-builder
3-
description: Build or update a SpiceBench system adapter with JSON-RPC over stdio and HTTP, including setup/query_method/teardown/metrics support and template validation.
3+
description: Build or update a SpiceBench system adapter with JSON-RPC over stdio and HTTP, including setup/create_tables/query_method/teardown/metrics support and template validation.
44
---
55

66
# SpiceBench System Adapter Builder
@@ -17,6 +17,7 @@ A JSON-RPC 2.0 adapter that supports both transports:
1717
Required methods:
1818

1919
- `setup(run_id, datasets)`
20+
- `create_tables(run_id)`
2021
- `query_method(run_id)`
2122
- `teardown(run_id)`
2223
- `metrics(run_id)`
@@ -34,15 +35,17 @@ Required methods:
3435

3536
1. Copy the nearest template from `system-adapters/templates/<language>`.
3637
2. Keep request/response envelopes JSON-RPC 2.0 compliant (`jsonrpc`, `id`, `method`, `params`).
37-
3. Implement `setup` and `teardown` with run-scoped resources keyed by `run_id`.
38-
4. Implement `query_method` to return:
38+
3. Implement `setup` with run-scoped resources keyed by `run_id`.
39+
4. Implement `create_tables` so the adapter creates/registers benchmark destination tables.
40+
5. Implement `query_method` to return:
3941
- `driver`: typically `flightsql` or `databricks`
4042
- `db_kwargs`: real endpoint + auth kwargs for the SUT
41-
5. Implement `metrics` to return both objects:
43+
6. Implement `teardown` with run-scoped cleanup keyed by `run_id`.
44+
7. Implement `metrics` to return both objects:
4245
- `resource`: CPU, memory, disk bytes, disk IOPS
4346
- `ingestion`: rows, bytes, rows/s, active connections
44-
6. Keep stdio and HTTP using the same dispatcher so behavior is identical.
45-
7. Return JSON-RPC errors with standard codes:
47+
8. Keep stdio and HTTP using the same dispatcher so behavior is identical.
48+
9. Return JSON-RPC errors with standard codes:
4649
- `-32700` parse error
4750
- `-32600` invalid request
4851
- `-32601` method not found
@@ -70,6 +73,7 @@ If any metric is unavailable, return `0`/`0.0` and document why.
7073

7174
- Adapter responds to all required methods over stdio and HTTP.
7275
- `rpc.methods` includes every exposed method.
76+
- `create_tables` creates/registers benchmark tables for each dataset.
7377
- `query_method` returns a valid `driver` and complete `db_kwargs`.
7478
- `metrics` returns both `resource` and `ingestion` objects.
7579
- Language build/syntax checks pass:

.github/workflows/validate_system_adapter_templates.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,9 +152,10 @@ jobs:
152152
working-directory: system-adapters/templates/go
153153
run: go build ./...
154154
- name: Smoke test Go template (HTTP setup)
155+
working-directory: system-adapters/templates/go
155156
run: |
156157
set -euo pipefail
157-
go run ./system-adapters/templates/go --transport http --host 127.0.0.1 --port 18083 --path /jsonrpc > /tmp/go-template.log 2>&1 &
158+
go run . --transport http --host 127.0.0.1 --port 18083 --path /jsonrpc > /tmp/go-template.log 2>&1 &
158159
pid=$!
159160
trap 'kill "$pid" 2>/dev/null || true' EXIT
160161

Cargo.lock

Lines changed: 20 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ exclude = ["system-adapters/databricks"]
33
members = [
44
".",
55
"crates/adbc_client",
6-
"crates/app",
6+
"crates/app", "crates/checkpointer",
77
"crates/data-generation",
88
"crates/duration-parse", "crates/etl",
99
"crates/flight_client",

README.md

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ flowchart TB
1717
direction TB
1818
1919
subgraph setup_phase["1 · Setup (JSON-RPC)"]
20-
adapter_iface["System Adapter Protocol\n(setup / teardown / metrics)"]
20+
adapter_iface["System Adapter Protocol\n(setup / create_tables /\nquery_method / teardown / metrics)"]
2121
spice["Spice Cloud Adapter"]
2222
databricks["Databricks Adapter"]
2323
other["... Other Adapters"]
@@ -104,8 +104,9 @@ flowchart TB
104104
105105
orchestrator -->|"start run"| run
106106
107-
adapter_iface -->|"setup(run_id)"| sut
108-
adapter_iface -->|"setup(run_id, datasets)\n→ ADBC driver + kwargs"| executors
107+
adapter_iface -->|"setup(run_id, datasets)"| sut
108+
adapter_iface -->|"create_tables(run_id)"| sut
109+
adapter_iface -->|"query_method(run_id)\n→ ADBC driver + kwargs"| executors
109110
setup_phase -->|"system ready"| bench_phase
110111
bench_phase -->|"benchmark complete"| teardown_phase
111112
@@ -120,11 +121,11 @@ flowchart TB
120121

121122
A **Run** is a single end-to-end execution of the benchmark for one system. Each Run proceeds through three phases:
122123

123-
| Phase | What happens | Timed? |
124-
| ------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------ | ------ |
125-
| **1. Setup** | Connect to system adapter via JSON-RPC (stdio or HTTP). Call `setup(run_id, datasets)` to provision the SUT and return ADBC driver config. | No |
126-
| **2. Benchmark (timed)** | Three sequential stages — warm-up (1× query set), baseline (10% of duration, 60s–600s), and load test (full duration with concurrent clients). | Yes |
127-
| **3. Teardown** | Call `teardown(run_id)` via the adapter to deprovision resources and clean up. | No |
124+
| Phase | What happens | Timed? |
125+
| ------------------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------ |
126+
| **1. Setup** | Connect to system adapter via JSON-RPC (stdio or HTTP). Call `setup(run_id, datasets)` to provision the SUT, then `create_tables(run_id)`, then `query_method(run_id)` to get ADBC driver config. | No |
127+
| **2. Benchmark (timed)** | Three sequential stages — warm-up (1× query set), baseline (10% of duration, 60s–600s), and load test (full duration with concurrent clients). | Yes |
128+
| **3. Teardown** | Call `teardown(run_id)` via the adapter to deprovision resources and clean up. | No |
128129

129130
The **E2E benchmark duration** (phase 2, load test stage) is the primary ranking metric. After the load test, each query's p99 latency is compared against the baseline: >20% increase = FAIL, 10–20% = WARN, ≥3 WARNs = FAIL.
130131

@@ -147,7 +148,7 @@ Common CLI/workflow usage:
147148
| Component | Responsibility |
148149
| --------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------------------- |
149150
| **GitHub Actions** | Orchestrates Runs on schedule, PR, or manual dispatch. Manages the full Run lifecycle across phases. |
150-
| **System Adapter Protocol** | JSON-RPC 2.0 interface (stdio or HTTP) for each platform. Methods: `setup`, `teardown`, `metrics`. |
151+
| **System Adapter Protocol** | JSON-RPC 2.0 interface (stdio or HTTP) for each platform. Methods: `setup`, `create_tables`, `query_method`, `teardown`, `metrics`. |
151152
| **Query Executors** | Pluggable query execution: ADBC direct (FlightSQL/Databricks drivers), HTTP (`/v1/sql`), or distributed (`/v1/queries` with polling). |
152153
| **Data Generator** | Standalone binary (`data-generation`) that produces TPC-H partitioned Parquet batches and writes them to S3. |
153154
| **Test Framework** | Core engine managing the warm-up → baseline → load test pipeline, query sets (TPC-H, TPC-DS, ClickBench, parameterized, scenario), and statistics collection. |
@@ -216,8 +217,10 @@ Results from every Run are published to [SpiceBench.com](https://spicebench.com)
216217
To benchmark a new platform, implement the JSON-RPC 2.0 adapter with these methods:
217218

218219
1. **`setup(run_id, datasets)`** — Provision infrastructure and configure the target system.
219-
2. **`teardown(run_id)`** — Clean up provisioned resources.
220-
3. **`metrics(run_id)`** *(optional)* — Return current resource usage (CPU, memory, disk, IOPS) and ingestion progress (rows, bytes, rows/s, active connections).
220+
2. **`create_tables(run_id)`** — Create/register destination tables for the benchmark datasets.
221+
3. **`query_method(run_id)`** — Return the ADBC driver type (`flightsql` or `databricks`) and connection kwargs so SpiceBench can establish a direct query connection.
222+
4. **`teardown(run_id)`** — Clean up provisioned resources.
223+
5. **`metrics(run_id)`** *(optional)* — Return current resource usage (CPU, memory, disk, IOPS) and ingestion progress (rows, bytes, rows/s, active connections).
221224

222225
The adapter can run as a **stdio** child process or as an **HTTP** server.
223226

@@ -236,7 +239,30 @@ The `spicebench` CLI connects to a system adapter using JSON-RPC 2.0 over either
236239
- **stdio transport**: use `--system-adapter-stdio-cmd` (SpiceBench starts the child process).
237240
- **HTTP transport**: use `--system-adapter-http-url` (SpiceBench connects to a remote adapter endpoint).
238241
- **execution mode**: `adapter-command` (default) dispatches `spicebench run ...` to adapter JSON-RPC `run.load`.
239-
- **execution mode**: `direct-query` runs the load/query path directly via ADBC, using the adapter only for setup/teardown/metrics.
242+
- **execution mode**: `direct-query` runs the load/query path directly via ADBC, using the adapter for setup/table creation/teardown/metrics.
243+
244+
#### Adapter lifecycle (direct-query mode)
245+
246+
For each run, SpiceBench calls adapter JSON-RPC methods in this order:
247+
248+
1. `setup(run_id, datasets, metadata)`
249+
2. `create_tables(run_id)`
250+
3. `query_method(run_id)`
251+
4. benchmark execution and optional periodic `metrics(run_id)` scraping
252+
5. `teardown(run_id)`
253+
254+
Tiny `create_tables` request example:
255+
256+
```json
257+
{
258+
"jsonrpc": "2.0",
259+
"id": 2,
260+
"method": "create_tables",
261+
"params": {
262+
"run_id": "00000000-0000-0000-0000-000000000000"
263+
}
264+
}
265+
```
240266

241267
#### Stdio example (child process started by SpiceBench)
242268

@@ -268,7 +294,7 @@ Notes:
268294
- `--system-adapter-stdio-args` passes CLI args to the stdio adapter command.
269295
- `--system-adapter-env` is only valid for stdio transport.
270296

271-
#### Direct-query example (ADBC query path, adapter for setup/teardown only)
297+
#### Direct-query example (ADBC query path, adapter for setup/table creation/teardown)
272298

273299
```bash
274300
spicebench \

crates/checkpointer/Cargo.toml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
[package]
2+
name = "checkpointer"
3+
edition.workspace = true
4+
exclude.workspace = true
5+
homepage.workspace = true
6+
license.workspace = true
7+
repository.workspace = true
8+
rust-version.workspace = true
9+
version.workspace = true
10+
11+
[dependencies]
12+
adbc_client = { path = "../adbc_client" }
13+
anyhow.workspace = true
14+
arrow.workspace = true
15+
async-trait.workspace = true
16+
chrono.workspace = true
17+
clap = { workspace = true, features = ["derive"] }
18+
data-generation = { path = "../data-generation" }
19+
etl = { path = "../etl" }
20+
serde_json.workspace = true
21+
system-adapter-protocol = { path = "../system-adapter-protocol" }
22+
tokio.workspace = true
23+
tokio-util.workspace = true
24+
tracing.workspace = true
25+
tracing-subscriber = { workspace = true, features = ["env-filter"] }

crates/checkpointer/README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
# Checkpointer
2+
3+
Runs ETL to a point-in-time, then captures the expected results from queries.

crates/checkpointer/src/main.rs

Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
/*
2+
Copyright 2024-2025 The Spice.ai OSS Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
use std::sync::Arc;
18+
19+
use adbc_client::AdbcConnection;
20+
use clap::Parser;
21+
use data_generation::config::{DatasetConfig, TargetConfig};
22+
use data_generation::dataset::MutationConfig;
23+
use data_generation::storage::s3::S3Storage;
24+
use etl::sink::adbc::AdbcSink;
25+
use etl::{DatasetSource, ETLPipeline, PipelineState, StopReason};
26+
use serde_json::Value;
27+
use tracing_subscriber::EnvFilter;
28+
29+
#[derive(Parser)]
30+
#[command(
31+
about = "Run an ETL pipeline that reads from S3, rehydrates data, and writes directly to a SUT via ADBC"
32+
)]
33+
struct Cli {
34+
/// Dataset type: "tpch" or "simple_sequence"
35+
#[arg(long, default_value = "tpch")]
36+
dataset: String,
37+
38+
/// Scale factor for data generation
39+
#[arg(long, default_value_t = 1.0)]
40+
scale_factor: f64,
41+
42+
/// Number of data generation steps (partitions)
43+
#[arg(long, default_value_t = 25)]
44+
num_steps: u16,
45+
46+
/// S3 bucket name (used for both source and target)
47+
#[arg(long)]
48+
bucket: String,
49+
50+
/// S3 key prefix for source data
51+
#[arg(long, default_value = "")]
52+
source_prefix: String,
53+
/// AWS region
54+
#[arg(long)]
55+
region: Option<String>,
56+
57+
/// S3 endpoint URL (for MinIO/LocalStack)
58+
#[arg(long)]
59+
endpoint: Option<String>,
60+
61+
/// ADBC driver name (for example: databricks, flightsql)
62+
#[arg(long)]
63+
adbc_driver: String,
64+
65+
/// ADBC connection URI passed as db option `uri`
66+
#[arg(long)]
67+
adbc_uri: String,
68+
69+
/// Optional schema name to prefix destination table names
70+
#[arg(long)]
71+
adbc_schema: Option<String>,
72+
73+
/// Every N steps to take a checkpoint
74+
#[arg(long, default_value_t = 100)]
75+
checkpoint_interval_steps: u64,
76+
}
77+
78+
impl Cli {
79+
fn dataset_source(&self) -> anyhow::Result<DatasetSource> {
80+
match self.dataset.as_str() {
81+
"tpch" => Ok(DatasetSource::Tpch),
82+
"simple_sequence" => Ok(DatasetSource::SimpleSequence),
83+
other => {
84+
anyhow::bail!("Unknown dataset type: {other}. Use 'tpch' or 'simple_sequence'.")
85+
}
86+
}
87+
}
88+
89+
fn dataset_config(&self) -> DatasetConfig {
90+
DatasetConfig {
91+
dataset_type: self.dataset.clone(),
92+
scale_factor: self.scale_factor,
93+
num_steps: self.num_steps,
94+
}
95+
}
96+
97+
fn source_config(&self) -> TargetConfig {
98+
TargetConfig {
99+
bucket: self.bucket.clone(),
100+
prefix: self.source_prefix.clone(),
101+
region: self.region.clone(),
102+
endpoint: self.endpoint.clone(),
103+
}
104+
}
105+
}
106+
107+
#[tokio::main]
108+
async fn main() -> anyhow::Result<()> {
109+
tracing_subscriber::fmt()
110+
.with_env_filter(EnvFilter::from_default_env())
111+
.init();
112+
113+
let cli = Cli::parse();
114+
115+
let dataset_source = cli.dataset_source()?;
116+
let dataset_config = cli.dataset_config();
117+
118+
let source = Arc::new(S3Storage::new(&cli.source_config())?);
119+
120+
let adbc_conn = AdbcConnection::create(
121+
&cli.adbc_driver,
122+
std::collections::HashMap::from([("uri".to_string(), Value::String(cli.adbc_uri.clone()))]),
123+
)?;
124+
let target = Arc::new(AdbcSink::new(adbc_conn, cli.adbc_schema.clone()));
125+
126+
let mutations = MutationConfig::new(0.1, 0.1);
127+
128+
let mut pipeline =
129+
ETLPipeline::new(dataset_source, &dataset_config, source, target, &mutations)?;
130+
131+
tracing::info!(
132+
dataset = %cli.dataset,
133+
bucket = %cli.bucket,
134+
source_prefix = %cli.source_prefix,
135+
adbc_driver = %cli.adbc_driver,
136+
adbc_schema = ?cli.adbc_schema,
137+
scale_factor = cli.scale_factor,
138+
num_steps = cli.num_steps,
139+
"Starting ETL pipeline"
140+
);
141+
142+
// Log the tables and schemas that will be processed.
143+
let datasets = pipeline.setup_request_datasets();
144+
for (name, config) in &datasets {
145+
tracing::info!(table = %name, schema = ?config.schema, "Dataset table registered");
146+
}
147+
148+
pipeline.initialize().await?;
149+
pipeline.run(cli.checkpoint_interval_steps as usize)?;
150+
151+
match pipeline.wait().await {
152+
PipelineState::Paused => {}
153+
PipelineState::Stopped(StopReason::Completed) => {
154+
tracing::info!("ETL pipeline completed successfully");
155+
}
156+
PipelineState::Stopped(StopReason::Cancelled) => {
157+
tracing::warn!("ETL pipeline was cancelled");
158+
}
159+
PipelineState::Stopped(StopReason::Error(e)) => {
160+
tracing::error!(error = %e, "ETL pipeline stopped with error");
161+
anyhow::bail!("ETL pipeline failed: {e}");
162+
}
163+
other => {
164+
anyhow::bail!("Unexpected final pipeline state: {other:?}");
165+
}
166+
}
167+
168+
// TODO: checkpoint the current state, continue the pipeline, and continue checkpointing until completion.
169+
170+
Ok(())
171+
}

0 commit comments

Comments
 (0)