Skip to content

Commit 554d4c6

Browse files
committed
Move client helpers to system-adapter-protocol
1 parent 2edd89a commit 554d4c6

4 files changed

Lines changed: 390 additions & 0 deletions

File tree

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/system-adapter-protocol/Cargo.toml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,5 +10,11 @@ repository.workspace = true
1010
serde = { workspace = true }
1111
serde_json = { workspace = true }
1212
uuid = { workspace = true, features = ["serde", "v4"] }
13+
tokio = { workspace = true, features = ["process", "io-util"] }
14+
reqwest = { workspace = true, optional = true }
15+
16+
[features]
17+
default = ["client"]
18+
client = ["reqwest"]
1319

1420
[dev-dependencies]
Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
/*
2+
Copyright 2024-2025 The Spice.ai OSS Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
//! Client implementations for system adapter JSON-RPC communication.
18+
19+
use crate::{methods, JsonRpcError, JsonRpcRequest, JsonRpcResponse};
20+
use serde::{de::DeserializeOwned, Serialize};
21+
use std::collections::HashMap;
22+
use tokio::{
23+
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
24+
process::{Child, ChildStdin, ChildStdout, Command},
25+
};
26+
27+
/// Result type for client operations
28+
pub type Result<T> = std::result::Result<T, ClientError>;
29+
30+
/// Errors that can occur during client operations
31+
#[derive(Debug)]
32+
pub enum ClientError {
33+
/// JSON-RPC error returned by the server
34+
JsonRpc(JsonRpcError),
35+
/// I/O error during communication
36+
Io(std::io::Error),
37+
/// JSON serialization/deserialization error
38+
Json(serde_json::Error),
39+
/// HTTP transport error
40+
#[cfg(feature = "client")]
41+
Http(reqwest::Error),
42+
/// Invalid response format
43+
InvalidResponse(String),
44+
/// Transport-specific error
45+
Transport(String),
46+
}
47+
48+
impl std::fmt::Display for ClientError {
49+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50+
match self {
51+
Self::JsonRpc(e) => write!(f, "JSON-RPC error: {}", e.message),
52+
Self::Io(e) => write!(f, "I/O error: {e}"),
53+
Self::Json(e) => write!(f, "JSON error: {e}"),
54+
#[cfg(feature = "client")]
55+
Self::Http(e) => write!(f, "HTTP error: {e}"),
56+
Self::InvalidResponse(msg) => write!(f, "Invalid response: {msg}"),
57+
Self::Transport(msg) => write!(f, "Transport error: {msg}"),
58+
}
59+
}
60+
}
61+
62+
impl std::error::Error for ClientError {}
63+
64+
impl From<std::io::Error> for ClientError {
65+
fn from(e: std::io::Error) -> Self {
66+
Self::Io(e)
67+
}
68+
}
69+
70+
impl From<serde_json::Error> for ClientError {
71+
fn from(e: serde_json::Error) -> Self {
72+
Self::Json(e)
73+
}
74+
}
75+
76+
#[cfg(feature = "client")]
77+
impl From<reqwest::Error> for ClientError {
78+
fn from(e: reqwest::Error) -> Self {
79+
Self::Http(e)
80+
}
81+
}
82+
83+
/// System adapter client for JSON-RPC communication
84+
pub enum Client {
85+
/// Stdio transport - communicate via stdin/stdout with a child process
86+
Stdio {
87+
_child: Box<Child>,
88+
stdin: ChildStdin,
89+
stdout: BufReader<ChildStdout>,
90+
},
91+
/// HTTP transport - communicate via HTTP POST requests
92+
#[cfg(feature = "client")]
93+
Http {
94+
client: reqwest::Client,
95+
endpoint: String,
96+
},
97+
}
98+
99+
impl Client {
100+
/// Create a client using stdio transport by spawning a command
101+
pub fn stdio(
102+
command: impl AsRef<str>,
103+
args: Vec<String>,
104+
env: HashMap<String, String>,
105+
) -> Result<Self> {
106+
let command_str = command.as_ref();
107+
let mut cmd = Command::new(command_str);
108+
109+
cmd.args(args);
110+
for (key, value) in env {
111+
cmd.env(key, value);
112+
}
113+
114+
cmd.stdin(std::process::Stdio::piped())
115+
.stdout(std::process::Stdio::piped())
116+
.stderr(std::process::Stdio::inherit());
117+
118+
let mut child = cmd.spawn().map_err(|e| {
119+
ClientError::Transport(format!(
120+
"Failed to start stdio command '{command_str}': {e}"
121+
))
122+
})?;
123+
124+
let stdin = child.stdin.take().ok_or_else(|| {
125+
ClientError::Transport("Stdio child missing stdin".to_string())
126+
})?;
127+
let stdout = child.stdout.take().ok_or_else(|| {
128+
ClientError::Transport("Stdio child missing stdout".to_string())
129+
})?;
130+
131+
Ok(Self::Stdio {
132+
_child: Box::new(child),
133+
stdin,
134+
stdout: BufReader::new(stdout),
135+
})
136+
}
137+
138+
/// Create a client using HTTP transport
139+
#[cfg(feature = "client")]
140+
pub fn http(endpoint: impl Into<String>) -> Self {
141+
Self::Http {
142+
client: reqwest::Client::new(),
143+
endpoint: endpoint.into(),
144+
}
145+
}
146+
147+
/// Get the transport name
148+
pub fn transport_name(&self) -> &'static str {
149+
match self {
150+
Self::Stdio { .. } => "stdio",
151+
#[cfg(feature = "client")]
152+
Self::Http { .. } => "http",
153+
}
154+
}
155+
156+
/// Query available RPC methods from the server
157+
pub async fn rpc_methods(&mut self) -> Result<Vec<String>> {
158+
let request = JsonRpcRequest::new(1, methods::RPC_METHODS, serde_json::json!({}));
159+
let response: JsonRpcResponse<serde_json::Value> = self.call_typed(request).await?;
160+
161+
let methods = response
162+
.result
163+
.as_ref()
164+
.and_then(|v| v.get("methods"))
165+
.and_then(|v| v.as_array())
166+
.ok_or_else(|| {
167+
ClientError::InvalidResponse(
168+
"Response missing result.methods array".to_string(),
169+
)
170+
})?
171+
.iter()
172+
.filter_map(|v| v.as_str().map(ToString::to_string))
173+
.collect();
174+
175+
Ok(methods)
176+
}
177+
178+
/// Make a typed JSON-RPC call with request and response types
179+
pub async fn call_typed<Req: Serialize, Resp: DeserializeOwned>(
180+
&mut self,
181+
request: JsonRpcRequest<Req>,
182+
) -> Result<JsonRpcResponse<Resp>> {
183+
let request_value = serde_json::to_value(request)?;
184+
let response_value = self.call_raw(request_value).await?;
185+
let response: JsonRpcResponse<Resp> = serde_json::from_value(response_value)?;
186+
187+
if let Some(error) = response.error {
188+
return Err(ClientError::JsonRpc(error));
189+
}
190+
191+
Ok(response)
192+
}
193+
194+
/// Make a raw JSON-RPC call with serde_json::Value
195+
pub async fn call_raw(&mut self, request: serde_json::Value) -> Result<serde_json::Value> {
196+
match self {
197+
Self::Stdio {
198+
_child: _,
199+
stdin,
200+
stdout,
201+
} => {
202+
let payload = serde_json::to_string(&request)?;
203+
stdin.write_all(payload.as_bytes()).await?;
204+
stdin.write_all(b"\n").await?;
205+
stdin.flush().await?;
206+
207+
let mut line = String::new();
208+
let read = stdout.read_line(&mut line).await?;
209+
if read == 0 {
210+
return Err(ClientError::Transport(
211+
"Stdio process closed stdout before responding".to_string(),
212+
));
213+
}
214+
215+
let response: serde_json::Value = serde_json::from_str(line.trim_end())?;
216+
if let Some(error) = response.get("error") {
217+
let error: JsonRpcError = serde_json::from_value(error.clone())?;
218+
return Err(ClientError::JsonRpc(error));
219+
}
220+
Ok(response)
221+
}
222+
#[cfg(feature = "client")]
223+
Self::Http { client, endpoint } => {
224+
let response = client
225+
.post(endpoint.as_str())
226+
.json(&request)
227+
.send()
228+
.await
229+
.map_err(|e| {
230+
ClientError::Transport(format!(
231+
"Failed to POST to {endpoint}: {e}"
232+
))
233+
})?;
234+
235+
let status = response.status();
236+
let value: serde_json::Value = response
237+
.json()
238+
.await
239+
.map_err(|e| {
240+
ClientError::Transport(format!(
241+
"Failed to parse response body (status {status}): {e}"
242+
))
243+
})?;
244+
245+
if let Some(error) = value.get("error") {
246+
let error: JsonRpcError = serde_json::from_value(error.clone())?;
247+
return Err(ClientError::JsonRpc(error));
248+
}
249+
Ok(value)
250+
}
251+
}
252+
}
253+
}
254+
255+
/// Builder for creating a `Client` with various configuration options
256+
pub struct ClientBuilder {
257+
transport: TransportConfig,
258+
}
259+
260+
enum TransportConfig {
261+
Stdio {
262+
command: String,
263+
args: Vec<String>,
264+
env: HashMap<String, String>,
265+
},
266+
#[cfg(feature = "client")]
267+
Http {
268+
endpoint: String,
269+
},
270+
}
271+
272+
impl ClientBuilder {
273+
/// Create a builder for stdio transport
274+
pub fn stdio(command: impl Into<String>) -> Self {
275+
Self {
276+
transport: TransportConfig::Stdio {
277+
command: command.into(),
278+
args: Vec::new(),
279+
env: HashMap::new(),
280+
},
281+
}
282+
}
283+
284+
/// Create a builder for HTTP transport
285+
#[cfg(feature = "client")]
286+
pub fn http(endpoint: impl Into<String>) -> Self {
287+
Self {
288+
transport: TransportConfig::Http {
289+
endpoint: endpoint.into(),
290+
},
291+
}
292+
}
293+
294+
/// Add command-line arguments (stdio only)
295+
pub fn with_args(mut self, args: Vec<String>) -> Self {
296+
if let TransportConfig::Stdio { args: ref mut a, .. } = self.transport {
297+
*a = args;
298+
}
299+
self
300+
}
301+
302+
/// Add environment variables (stdio only)
303+
pub fn with_env(mut self, env: HashMap<String, String>) -> Self {
304+
if let TransportConfig::Stdio { env: ref mut e, .. } = self.transport {
305+
*e = env;
306+
}
307+
self
308+
}
309+
310+
/// Build the client
311+
pub fn build(self) -> Result<Client> {
312+
match self.transport {
313+
TransportConfig::Stdio { command, args, env } => Client::stdio(command, args, env),
314+
#[cfg(feature = "client")]
315+
TransportConfig::Http { endpoint } => Ok(Client::http(endpoint)),
316+
}
317+
}
318+
}
319+
320+
#[cfg(test)]
321+
mod tests {
322+
use super::*;
323+
324+
#[test]
325+
fn test_client_error_display() {
326+
let error = ClientError::Transport("test error".to_string());
327+
assert_eq!(format!("{error}"), "Transport error: test error");
328+
}
329+
330+
#[test]
331+
fn test_builder_stdio() {
332+
let builder = ClientBuilder::stdio("python");
333+
assert!(matches!(builder.transport, TransportConfig::Stdio { .. }));
334+
}
335+
336+
#[cfg(feature = "client")]
337+
#[test]
338+
fn test_builder_http() {
339+
let builder = ClientBuilder::http("http://localhost:8080");
340+
assert!(matches!(builder.transport, TransportConfig::Http { .. }));
341+
}
342+
}

0 commit comments

Comments
 (0)