Skip to content

Commit cd81af4

Browse files
authored
Add support for system adapter execution modes and JSON-RPC dispatching (#8)
* Add support for system adapter execution modes and JSON-RPC dispatching * Refactor code formatting and improve readability in mod.rs
1 parent 63a6e92 commit cd81af4

4 files changed

Lines changed: 199 additions & 19 deletions

File tree

README.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,11 @@ To benchmark a new platform, implement the adapter interface:
172172

173173
- **stdio transport**: use `--system-adapter-stdio-cmd` (spicebench starts the child process).
174174
- **HTTP transport**: use `--system-adapter-http-url` (spicebench connects to a remote adapter endpoint).
175+
- **execution mode**: use `--system-adapter-execution-mode`:
176+
- `adapter-command` (default): dispatches `spicebench run ...` to adapter JSON-RPC `run.load`
177+
- `direct-query`: spicebench runs load/query path directly, while still connecting to adapter
178+
179+
When adapter transport is configured, `spicebench run ...` is dispatched to spidapter method `run.load` over JSON-RPC and the adapter's `stdout`/`stderr` are streamed back.
175180

176181
#### Stdio example (child process started by spicebench)
177182

@@ -202,7 +207,18 @@ Notes:
202207
- Set **exactly one** of `--system-adapter-stdio-cmd` or `--system-adapter-http-url`.
203208
- `--system-adapter-stdio-args` passes CLI args to the stdio adapter command.
204209
- `--system-adapter-env` is only valid for stdio transport.
205-
- On connection, spicebench issues JSON-RPC `rpc.methods` to verify the adapter is reachable.
210+
- Spicebench validates the adapter supports JSON-RPC method `run.load` before dispatch.
211+
212+
#### Direct-query example (run in spicebench, keep adapter connected)
213+
214+
```bash
215+
spicebench run \
216+
--query-set tpch \
217+
--spicepod-path ./spicepod.yaml \
218+
--system-adapter-name spidapter \
219+
--system-adapter-execution-mode direct-query \
220+
--system-adapter-http-url http://127.0.0.1:8080/jsonrpc
221+
```
206222

207223
## License
208224

src/args/mod.rs

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ limitations under the License.
1616

1717
use std::path::PathBuf;
1818

19-
use clap::{ArgAction, Parser, Subcommand};
19+
use clap::{ArgAction, Parser, Subcommand, ValueEnum};
2020

2121
mod dataset;
2222
pub use dataset::{DatasetTestArgs, LoadTestArgs};
@@ -82,6 +82,12 @@ pub struct CommonArgs {
8282
#[arg(long, default_value = "system_adapter")]
8383
pub(crate) system_adapter_name: String,
8484

85+
/// How to execute when a system adapter transport is configured.
86+
/// - adapter-command: dispatch spicebench run as a JSON-RPC command (e.g. run.load)
87+
/// - direct-query: execute load/query path in spicebench directly
88+
#[arg(long, value_enum, default_value = "adapter-command")]
89+
pub(crate) system_adapter_execution_mode: SystemAdapterExecutionMode,
90+
8591
/// Command to run for a stdio JSON-RPC system adapter.
8692
#[arg(long, conflicts_with = "system_adapter_http_url")]
8793
pub(crate) system_adapter_stdio_cmd: Option<String>,
@@ -124,3 +130,9 @@ fn parse_key_val(s: &str) -> Result<(String, String), String> {
124130
.ok_or_else(|| "expected KEY=VALUE formatted header".to_string())?;
125131
Ok((s[..pos].to_string(), s[pos + 1..].to_string()))
126132
}
133+
134+
#[derive(Debug, Clone, Copy, PartialEq, Eq, ValueEnum)]
135+
pub enum SystemAdapterExecutionMode {
136+
AdapterCommand,
137+
DirectQuery,
138+
}

src/commands/mod.rs

Lines changed: 159 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,13 @@ See the License for the specific language governing permissions and
1414
limitations under the License.
1515
*/
1616

17-
use std::{collections::BTreeMap, sync::Arc, time::Duration};
17+
use std::{
18+
collections::{BTreeMap, HashMap},
19+
sync::Arc,
20+
time::Duration,
21+
};
1822

19-
use crate::args::{CommonArgs, DatasetTestArgs};
23+
use crate::args::{CommonArgs, DatasetTestArgs, SystemAdapterExecutionMode};
2024
use test_framework::{
2125
anyhow,
2226
anyhow::Context,
@@ -31,7 +35,7 @@ use test_framework::{
3135
};
3236
use tokio::{
3337
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
34-
process::{ChildStdin, ChildStdout, Command},
38+
process::{Child, ChildStdin, ChildStdout, Command},
3539
};
3640

3741
pub(crate) mod load;
@@ -102,16 +106,6 @@ pub(crate) async fn build_test_with_validation(
102106
pub(crate) async fn run_or_connect_spiced(
103107
args: &CommonArgs,
104108
) -> anyhow::Result<(App, SpicedInstance)> {
105-
if let Some(mut adapter) = SystemAdapterClient::connect(args).await? {
106-
let methods = adapter.rpc_methods().await?;
107-
println!(
108-
"Connected to system adapter '{}' via {} ({} methods)",
109-
args.system_adapter_name,
110-
adapter.transport_name(),
111-
methods.len()
112-
);
113-
}
114-
115109
let (app, mut instance) = if args.is_external_instance() {
116110
println!(
117111
"Connecting to external spiced instance at: {}",
@@ -170,8 +164,154 @@ pub(crate) async fn get_app_and_start_request(
170164
Ok((app, start_request))
171165
}
172166

167+
pub(crate) async fn maybe_dispatch_run_to_system_adapter(
168+
raw_cli_args: &[String],
169+
common_args: &CommonArgs,
170+
) -> anyhow::Result<bool> {
171+
if !has_system_adapter_transport(common_args) {
172+
return Ok(false);
173+
}
174+
175+
let mut adapter = SystemAdapterClient::connect(common_args)
176+
.await?
177+
.context("System adapter transport was configured but could not be initialized")?;
178+
179+
let methods = adapter.rpc_methods().await?;
180+
181+
if common_args.system_adapter_execution_mode == SystemAdapterExecutionMode::DirectQuery {
182+
println!(
183+
"Connected to system adapter '{}' via {} in direct-query mode (spicebench executes query/load path directly)",
184+
common_args.system_adapter_name,
185+
adapter.transport_name(),
186+
);
187+
return Ok(false);
188+
}
189+
190+
let Some(method) = resolve_system_adapter_method(raw_cli_args) else {
191+
anyhow::bail!(
192+
"No JSON-RPC adapter method mapping for current command invocation: {:?}",
193+
raw_cli_args
194+
);
195+
};
196+
197+
if !methods.iter().any(|available| available == method) {
198+
anyhow::bail!(
199+
"System adapter '{}' via {} does not support required method '{method}'",
200+
common_args.system_adapter_name,
201+
adapter.transport_name(),
202+
);
203+
}
204+
205+
let adapter_args = adapter_cli_args_for_run(raw_cli_args);
206+
let mut params = serde_json::Map::new();
207+
params.insert("args".to_string(), serde_json::to_value(adapter_args)?);
208+
209+
for (key, value) in &common_args.system_adapter_param {
210+
params.insert(key.clone(), serde_json::Value::String(value.clone()));
211+
}
212+
213+
let request = serde_json::json!({
214+
"jsonrpc": "2.0",
215+
"id": 2,
216+
"method": method,
217+
"params": serde_json::Value::Object(params),
218+
});
219+
220+
let response = adapter.call(request).await?;
221+
handle_adapter_execution_response(&response)?;
222+
223+
Ok(true)
224+
}
225+
226+
fn resolve_system_adapter_method(raw_cli_args: &[String]) -> Option<&'static str> {
227+
match raw_cli_args.first().map(String::as_str) {
228+
Some("run") => Some("run.load"),
229+
_ => None,
230+
}
231+
}
232+
233+
fn has_system_adapter_transport(args: &CommonArgs) -> bool {
234+
args.system_adapter_stdio_cmd.is_some() || args.system_adapter_http_url.is_some()
235+
}
236+
237+
fn adapter_cli_args_for_run(raw_cli_args: &[String]) -> Vec<String> {
238+
let mut filtered = Vec::new();
239+
let mut skip_next = false;
240+
241+
for (index, arg) in raw_cli_args.iter().enumerate() {
242+
if index == 0 && arg == "run" {
243+
continue;
244+
}
245+
246+
if skip_next {
247+
skip_next = false;
248+
continue;
249+
}
250+
251+
let takes_value = [
252+
"--system-adapter-name",
253+
"--system-adapter-execution-mode",
254+
"--system-adapter-stdio-cmd",
255+
"--system-adapter-stdio-args",
256+
"--system-adapter-http-url",
257+
"--system-adapter-param",
258+
"--system-adapter-env",
259+
];
260+
261+
if takes_value.contains(&arg.as_str()) {
262+
skip_next = true;
263+
continue;
264+
}
265+
266+
if takes_value
267+
.iter()
268+
.any(|flag| arg.starts_with(&format!("{flag}=")))
269+
{
270+
continue;
271+
}
272+
273+
filtered.push(arg.clone());
274+
}
275+
276+
filtered
277+
}
278+
279+
fn handle_adapter_execution_response(response: &serde_json::Value) -> anyhow::Result<()> {
280+
let result = response
281+
.get("result")
282+
.context("System adapter response missing JSON-RPC result payload")?;
283+
284+
if let Some(stdout) = result.get("stdout").and_then(|v| v.as_str())
285+
&& !stdout.is_empty()
286+
{
287+
print!("{stdout}");
288+
}
289+
290+
if let Some(stderr) = result.get("stderr").and_then(|v| v.as_str())
291+
&& !stderr.is_empty()
292+
{
293+
eprint!("{stderr}");
294+
}
295+
296+
let success = result
297+
.get("success")
298+
.and_then(|v| v.as_bool())
299+
.unwrap_or(false);
300+
let exit_code = result
301+
.get("exit_code")
302+
.and_then(|v| v.as_i64())
303+
.unwrap_or(1);
304+
305+
if !success || exit_code != 0 {
306+
anyhow::bail!("System adapter command failed (success={success}, exit_code={exit_code})");
307+
}
308+
309+
Ok(())
310+
}
311+
173312
enum SystemAdapterClient {
174313
Stdio {
314+
child: Child,
175315
stdin: ChildStdin,
176316
stdout: BufReader<ChildStdout>,
177317
},
@@ -238,9 +378,8 @@ impl SystemAdapterClient {
238378
.take()
239379
.context("System adapter stdio child missing stdout")?;
240380

241-
std::mem::forget(child);
242-
243381
return Ok(Some(Self::Stdio {
382+
child,
244383
stdin,
245384
stdout: BufReader::new(stdout),
246385
}));
@@ -283,7 +422,11 @@ impl SystemAdapterClient {
283422

284423
async fn call(&mut self, request: serde_json::Value) -> anyhow::Result<serde_json::Value> {
285424
match self {
286-
Self::Stdio { stdin, stdout } => {
425+
Self::Stdio {
426+
child: _,
427+
stdin,
428+
stdout,
429+
} => {
287430
let payload = serde_json::to_string(&request)?;
288431
stdin.write_all(payload.as_bytes()).await?;
289432
stdin.write_all(b"\n").await?;

src/main.rs

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,10 +37,19 @@ async fn main() -> anyhow::Result<()> {
3737
let _ = rustls::crypto::CryptoProvider::install_default(
3838
rustls::crypto::aws_lc_rs::default_provider(),
3939
);
40+
let raw_cli_args: Vec<String> = std::env::args().skip(1).collect();
4041
let cli = Cli::parse();
4142

4243
match cli.subcommand {
43-
Commands::Run(args) => commands::load::run(&args).await?,
44+
Commands::Run(args) => {
45+
if commands::maybe_dispatch_run_to_system_adapter(&raw_cli_args, &args.test_args.common)
46+
.await?
47+
{
48+
return Ok(());
49+
}
50+
51+
commands::load::run(&args).await?
52+
}
4453
}
4554

4655
Ok(())

0 commit comments

Comments
 (0)