Skip to content

Commit 708465d

Browse files
authored
Fix http connector (spiceai#9818)
1 parent 8cb9c38 commit 708465d

3 files changed

Lines changed: 326 additions & 12 deletions

File tree

crates/runtime/src/dataconnector/https.rs

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ impl Https {
7474
| "tsv"
7575
| "arrow"
7676
| "avro"
77-
| "json"
7877
| "jsonl"
7978
| "ndjson"
8079
| "ldjson"
@@ -84,6 +83,13 @@ impl Https {
8483
return true;
8584
}
8685

86+
// JSON format is structured only for static file endpoints.
87+
// Dynamic API endpoints (with allowed_request_paths, request_query_filters, etc.)
88+
// should use HttpTableProvider instead.
89+
if file_format == "json" && !self.has_dynamic_api_params() {
90+
return true;
91+
}
92+
8793
// If file_format is "auto", try to detect from URL extension
8894
if file_format == "auto"
8995
&& let Ok(url) = Url::parse(&dataset.from)
@@ -96,22 +102,47 @@ impl Https {
96102
.map(str::to_ascii_lowercase)
97103
.unwrap_or_default();
98104

99-
return matches!(
105+
if matches!(
100106
extension.as_str(),
101-
"parquet"
102-
| "csv"
103-
| "tsv"
104-
| "arrow"
105-
| "avro"
106-
| "json"
107-
| "jsonl"
108-
| "ndjson"
109-
| "ldjson"
110-
);
107+
"parquet" | "csv" | "tsv" | "arrow" | "avro" | "jsonl" | "ndjson" | "ldjson"
108+
) {
109+
return true;
110+
}
111+
112+
if extension == "json" && !self.has_dynamic_api_params() {
113+
return true;
114+
}
111115
}
112116

113117
false
114118
}
119+
120+
/// Returns true if the connector is configured with parameters that indicate
121+
/// a dynamic HTTP API endpoint (as opposed to a static file download).
122+
fn has_dynamic_api_params(&self) -> bool {
123+
let has_allowed_paths = self
124+
.params
125+
.get("allowed_request_paths")
126+
.expose()
127+
.ok()
128+
.is_some_and(|v| !v.is_empty());
129+
130+
let has_query_filters = self
131+
.params
132+
.get("request_query_filters")
133+
.expose()
134+
.ok()
135+
.is_some_and(util::parse_enabled);
136+
137+
let has_body_filters = self
138+
.params
139+
.get("request_body_filters")
140+
.expose()
141+
.ok()
142+
.is_some_and(util::parse_enabled);
143+
144+
has_allowed_paths || has_query_filters || has_body_filters
145+
}
115146
}
116147

117148
struct HttpProviderParams {

crates/runtime/tests/http/mod.rs

Lines changed: 282 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,282 @@
1+
/*
2+
Copyright 2026 The Spice.ai OSS Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
use std::collections::HashMap;
18+
use std::net::SocketAddr;
19+
use std::sync::Arc;
20+
21+
use app::AppBuilder;
22+
use arrow::array::RecordBatch;
23+
use axum::{Router, routing::get};
24+
use runtime::Runtime;
25+
use spicepod::{component::dataset::Dataset, param::Params as DatasetParams};
26+
use tokio::net::TcpListener;
27+
28+
use crate::utils::{register_test_connectors, test_request_context};
29+
use crate::{ValidateFn, configure_test_datafusion, init_tracing, run_query_and_check_results};
30+
31+
const SHOWS_JSON: &str = r#"[
32+
{"id": 1, "name": "Breaking Bad", "rating": 9.5},
33+
{"id": 2, "name": "The Wire", "rating": 9.3},
34+
{"id": 3, "name": "Better Call Saul", "rating": 8.9}
35+
]"#;
36+
37+
const ITEMS_CSV: &str = "id,name,price\n1,Widget,9.99\n2,Gadget,19.99\n3,Doohickey,4.99\n";
38+
39+
async fn start_http_server() -> Result<(tokio::sync::oneshot::Sender<()>, SocketAddr), String> {
40+
let (tx, rx) = tokio::sync::oneshot::channel::<()>();
41+
42+
let app = Router::new()
43+
.route(
44+
"/api/shows",
45+
get(|| async { ([("content-type", "application/json")], SHOWS_JSON) }),
46+
)
47+
.route(
48+
"/api/shows/{id}",
49+
get(
50+
|axum::extract::Path(id): axum::extract::Path<u32>| async move {
51+
let show = match id {
52+
1 => r#"{"id": 1, "name": "Breaking Bad", "rating": 9.5}"#,
53+
2 => r#"{"id": 2, "name": "The Wire", "rating": 9.3}"#,
54+
3 => r#"{"id": 3, "name": "Better Call Saul", "rating": 8.9}"#,
55+
_ => return (axum::http::StatusCode::NOT_FOUND, "Not found".to_string()),
56+
};
57+
(axum::http::StatusCode::OK, show.to_string())
58+
},
59+
),
60+
)
61+
.route(
62+
"/data/shows.json",
63+
get(|| async { ([("content-type", "application/json")], SHOWS_JSON) }),
64+
)
65+
.route(
66+
"/data/items.csv",
67+
get(|| async { ([("content-type", "text/csv")], ITEMS_CSV) }),
68+
);
69+
70+
let tcp_listener = TcpListener::bind("127.0.0.1:0").await.map_err(|e| {
71+
tracing::error!("Failed to bind to address: {e}");
72+
e.to_string()
73+
})?;
74+
let addr = tcp_listener.local_addr().map_err(|e| {
75+
tracing::error!("Failed to get local address: {e}");
76+
e.to_string()
77+
})?;
78+
79+
tokio::spawn(async move {
80+
axum::serve(tcp_listener, app)
81+
.with_graceful_shutdown(async {
82+
rx.await.ok();
83+
})
84+
.await
85+
.unwrap_or_default();
86+
});
87+
88+
Ok((tx, addr))
89+
}
90+
91+
/// Test that a dynamic JSON API endpoint (with `allowed_request_paths` and
92+
/// `request_query_filters`) routes through `HttpTableProvider`, not
93+
/// `HttpListingConnector`. This was a regression introduced by adding "json"
94+
/// to the structured formats list.
95+
#[tokio::test]
96+
async fn test_http_json_api_dynamic() -> Result<(), String> {
97+
type QueryTests<'a> = Vec<(&'a str, &'a str, Option<Box<ValidateFn>>)>;
98+
let _tracing = init_tracing(Some("integration=debug,info"));
99+
register_test_connectors().await;
100+
101+
test_request_context()
102+
.scope(async {
103+
let (tx, addr) = start_http_server().await?;
104+
tracing::debug!("HTTP test server started at {addr}");
105+
106+
let mut dataset = Dataset::new(format!("http://{addr}/api"), "shows");
107+
dataset.params = Some(DatasetParams::from_string_map(HashMap::from([
108+
("file_format".to_string(), "json".to_string()),
109+
(
110+
"allowed_request_paths".to_string(),
111+
"/shows,/shows/*".to_string(),
112+
),
113+
("request_query_filters".to_string(), "enabled".to_string()),
114+
])));
115+
116+
let app = AppBuilder::new("http_dynamic_test")
117+
.with_dataset(dataset)
118+
.build();
119+
120+
configure_test_datafusion();
121+
let mut rt = Runtime::builder().with_app(app).build().await;
122+
123+
let cloned_rt = Arc::new(rt.clone());
124+
tokio::select! {
125+
() = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
126+
return Err("Timed out waiting for datasets to load".to_string());
127+
}
128+
() = cloned_rt.load_components() => {}
129+
}
130+
131+
let queries: QueryTests = vec![(
132+
"SELECT request_path, content FROM shows WHERE request_path = '/shows'",
133+
"http_dynamic_json_api",
134+
Some(Box::new(|result_batches| {
135+
let total_rows: usize = result_batches.iter().map(RecordBatch::num_rows).sum();
136+
assert!(
137+
total_rows > 0,
138+
"expected at least one row, got {total_rows}"
139+
);
140+
})),
141+
)];
142+
143+
for (query, snapshot_suffix, validate_result) in queries {
144+
run_query_and_check_results(
145+
&mut rt,
146+
snapshot_suffix,
147+
query,
148+
false,
149+
validate_result,
150+
)
151+
.await?;
152+
}
153+
154+
tx.send(())
155+
.map_err(|()| "Failed to send shutdown signal".to_string())?;
156+
Ok(())
157+
})
158+
.await
159+
}
160+
161+
/// Test that a static JSON file endpoint (without dynamic API params) correctly
162+
/// routes through `HttpListingConnector`.
163+
#[tokio::test]
164+
async fn test_http_json_static_file() -> Result<(), String> {
165+
type QueryTests<'a> = Vec<(&'a str, &'a str, Option<Box<ValidateFn>>)>;
166+
let _tracing = init_tracing(Some("integration=debug,info"));
167+
register_test_connectors().await;
168+
169+
test_request_context()
170+
.scope(async {
171+
let (tx, addr) = start_http_server().await?;
172+
tracing::debug!("HTTP test server started at {addr}");
173+
174+
let mut dataset =
175+
Dataset::new(format!("http://{addr}/data/shows.json"), "shows_static");
176+
dataset.params = Some(DatasetParams::from_string_map(HashMap::from([(
177+
"file_format".to_string(),
178+
"json".to_string(),
179+
)])));
180+
181+
let app = AppBuilder::new("http_static_json_test")
182+
.with_dataset(dataset)
183+
.build();
184+
185+
configure_test_datafusion();
186+
let mut rt = Runtime::builder().with_app(app).build().await;
187+
188+
let cloned_rt = Arc::new(rt.clone());
189+
tokio::select! {
190+
() = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
191+
return Err("Timed out waiting for datasets to load".to_string());
192+
}
193+
() = cloned_rt.load_components() => {}
194+
}
195+
196+
let queries: QueryTests = vec![(
197+
"SELECT * FROM shows_static",
198+
"http_static_json_file",
199+
Some(Box::new(|result_batches| {
200+
let total_rows: usize = result_batches.iter().map(RecordBatch::num_rows).sum();
201+
assert_eq!(total_rows, 3, "expected 3 rows, got {total_rows}");
202+
})),
203+
)];
204+
205+
for (query, snapshot_suffix, validate_result) in queries {
206+
run_query_and_check_results(
207+
&mut rt,
208+
snapshot_suffix,
209+
query,
210+
false,
211+
validate_result,
212+
)
213+
.await?;
214+
}
215+
216+
tx.send(())
217+
.map_err(|()| "Failed to send shutdown signal".to_string())?;
218+
Ok(())
219+
})
220+
.await
221+
}
222+
223+
/// Test that a CSV file served over HTTP correctly routes through
224+
/// `HttpListingConnector` (structured format, always).
225+
#[tokio::test]
226+
async fn test_http_csv_static_file() -> Result<(), String> {
227+
type QueryTests<'a> = Vec<(&'a str, &'a str, Option<Box<ValidateFn>>)>;
228+
let _tracing = init_tracing(Some("integration=debug,info"));
229+
register_test_connectors().await;
230+
231+
test_request_context()
232+
.scope(async {
233+
let (tx, addr) = start_http_server().await?;
234+
tracing::debug!("HTTP test server started at {addr}");
235+
236+
let mut dataset = Dataset::new(format!("http://{addr}/data/items.csv"), "items");
237+
dataset.params = Some(DatasetParams::from_string_map(HashMap::from([(
238+
"file_format".to_string(),
239+
"csv".to_string(),
240+
)])));
241+
242+
let app = AppBuilder::new("http_csv_test")
243+
.with_dataset(dataset)
244+
.build();
245+
246+
configure_test_datafusion();
247+
let mut rt = Runtime::builder().with_app(app).build().await;
248+
249+
let cloned_rt = Arc::new(rt.clone());
250+
tokio::select! {
251+
() = tokio::time::sleep(std::time::Duration::from_secs(60)) => {
252+
return Err("Timed out waiting for datasets to load".to_string());
253+
}
254+
() = cloned_rt.load_components() => {}
255+
}
256+
257+
let queries: QueryTests = vec![(
258+
"SELECT * FROM items",
259+
"http_csv_static_file",
260+
Some(Box::new(|result_batches| {
261+
let total_rows: usize = result_batches.iter().map(RecordBatch::num_rows).sum();
262+
assert_eq!(total_rows, 3, "expected 3 rows, got {total_rows}");
263+
})),
264+
)];
265+
266+
for (query, snapshot_suffix, validate_result) in queries {
267+
run_query_and_check_results(
268+
&mut rt,
269+
snapshot_suffix,
270+
query,
271+
false,
272+
validate_result,
273+
)
274+
.await?;
275+
}
276+
277+
tx.send(())
278+
.map_err(|()| "Failed to send shutdown signal".to_string())?;
279+
Ok(())
280+
})
281+
.await
282+
}

crates/runtime/tests/integration.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ mod gcs;
6969
mod github;
7070
mod glue;
7171
mod graphql;
72+
mod http;
7273
mod iceberg;
7374
mod iceberg_api;
7475
mod json;

0 commit comments

Comments
 (0)