Skip to content

Commit a874314

Browse files
authored
Add schema decomposition to the HTTP connector (spiceai#10393)
* Add schema decomposition to HTTP connector Mirrors the DynamoDB connector's `json_object: "*"` column-metadata feature in the HTTP connector. When a dataset declares a set of `columns:` with exactly one column marked `metadata.json_object: "*"`, each JSON row returned by the endpoint is decomposed: - Declared static columns are projected as top-level Utf8 fields (nulls preserved for absent keys). - All remaining keys are gathered into the marked catch-all column as a sorted JSON object string. No data is silently dropped: every key lands in either a static column or the catch-all JSON. Also adds user-facing docs covering schema decomposition for both the DynamoDB and HTTP connectors with TVmaze examples. * fix: address schema-decomposition PR review comments Round of Copilot review fixes on the HTTP schema-decomposition PR: - json_nest.rs: switch the static_fields filter to compare via `.as_str()` instead of the prior `**c != json_field_name`. Both forms compile, but the `.as_str()` form is clearer to readers. - provider.rs: in `From<Error> for DataFusionError`, box the outer `Error::JsonNesting { source }` so the variant's display context ("Failed to decompose HTTP response row into declared columns ...") travels with the `External` error instead of being stripped. - provider.rs: unit coverage for `create_batch_from_rows_nested` via `HttpExec` — object rows, missing keys, non-object rows, empty catch-all, empty-projection fallback, and dispatch through the non-nested entry point. - https.rs: unit coverage for `parse_http_json_nesting` — missing columns, missing marker, valid "*" marker, multiple markers, non-wildcard string, and non-string marker values. * fix: satisfy clippy pedantic in schema-decomposition code - Backtick `DynamoDB` in two doc comments so clippy::doc_markdown passes. - Rename `marker` → `marker_value` inside `parse_http_json_nesting` so it no longer trips clippy::similar_names against the surrounding `marked` binding. * perf: skip catch-all JSON build when projection excludes it Address Copilot follow-up review: when a query projection doesn't include the catch-all column, the previous implementation still decomposed every row into a HashMap and serialized the catch-all JSON object — wasted work that scales with row width. The nested batch builder now: - walks the projection once, detects whether the catch-all column is included, and writes values directly into per-column StringBuilders in a single pass instead of materializing a full decomposed row per input; - takes a fast path for object rows when the catch-all isn't projected: reads static fields straight out of the parsed JSON object and skips the HashMap + serialize step; - falls back to the existing decompose_json_row path when the catch-all is needed or the row isn't a JSON object, which keeps error propagation and non-object row handling identical. Added two tests: one locks down the fast path (narrow projection against wide rows) and one covers the fall-through on non-object rows when the catch-all isn't projected.
1 parent 3582d66 commit a874314

5 files changed

Lines changed: 1044 additions & 2 deletions

File tree

Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
/*
2+
Copyright 2024-2026 The Spice.ai OSS Authors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
https://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
*/
16+
17+
//! JSON schema decomposition for the HTTP connector.
18+
//!
19+
//! Mirrors the `DynamoDB` connector's `JsonNesting` feature so that users
20+
//! can declare a set of top-level static columns plus one catch-all JSON
21+
//! column via the spicepod `columns:` syntax:
22+
//!
23+
//! ```yaml
24+
//! datasets:
25+
//! - from: https://api.tvmaze.com/shows
26+
//! name: tvmaze_shows
27+
//! columns:
28+
//! - name: id
29+
//! - name: name
30+
//! - name: premiered
31+
//! - name: details
32+
//! metadata:
33+
//! json_object: "*"
34+
//! ```
35+
//!
36+
//! Each JSON row returned by the HTTP endpoint is decomposed: every
37+
//! declared static field is projected as a top-level `Utf8` column, and
38+
//! all remaining keys are serialized into a JSON object string stored in
39+
//! the catch-all column.
40+
41+
use snafu::{ResultExt, Snafu};
42+
use std::collections::{BTreeMap, HashMap, HashSet};
43+
44+
#[derive(Debug, Snafu)]
45+
pub enum Error {
46+
#[snafu(display("Failed to parse HTTP response row as JSON: {source}"))]
47+
JsonParse { source: serde_json::Error },
48+
49+
#[snafu(display("Failed to serialize catch-all JSON column: {source}"))]
50+
JsonSerialize { source: serde_json::Error },
51+
}
52+
53+
pub type Result<T, E = Error> = std::result::Result<T, E>;
54+
55+
/// Configuration for decomposing a JSON response row into a set of
56+
/// user-declared static columns plus a single catch-all JSON column.
57+
///
58+
/// Static fields are projected into their own top-level columns; every
59+
/// other JSON key in the response row is gathered into a sorted JSON
60+
/// object and stored (as a string) in the `json_field_name` column.
61+
#[derive(Debug, Clone)]
62+
pub struct HttpJsonNesting {
63+
/// Column order exactly as declared in the spicepod, used to build
64+
/// the table schema. Includes `json_field_name` in its declared
65+
/// position.
66+
pub column_order: Vec<String>,
67+
/// Set of declared static field names (i.e. `column_order` minus
68+
/// `json_field_name`).
69+
pub static_fields: HashSet<String>,
70+
/// Name of the catch-all JSON column.
71+
pub json_field_name: String,
72+
}
73+
74+
impl HttpJsonNesting {
75+
/// Build a new nesting configuration from the declared column order
76+
/// and the name of the catch-all column. The catch-all column name
77+
/// must appear in `column_order`.
78+
#[must_use]
79+
pub fn new(column_order: Vec<String>, json_field_name: String) -> Self {
80+
let static_fields: HashSet<String> = column_order
81+
.iter()
82+
.filter(|c| c.as_str() != json_field_name.as_str())
83+
.cloned()
84+
.collect();
85+
Self {
86+
column_order,
87+
static_fields,
88+
json_field_name,
89+
}
90+
}
91+
}
92+
93+
/// Decomposed representation of a single HTTP JSON response row.
94+
///
95+
/// Keys are column names. `None` values represent SQL `NULL`.
96+
pub type DecomposedRow = HashMap<String, Option<String>>;
97+
98+
/// Decompose a single JSON row string according to the nesting
99+
/// configuration. See [`HttpJsonNesting`] for the semantics.
100+
///
101+
/// Behavior for non-object JSON rows (arrays, primitives): the entire
102+
/// row is placed into the catch-all column and every declared static
103+
/// field resolves to `NULL`. This preserves data without silently
104+
/// dropping values.
105+
pub fn decompose_json_row(json_row: &str, nesting: &HttpJsonNesting) -> Result<DecomposedRow> {
106+
let value: serde_json::Value = serde_json::from_str(json_row).context(JsonParseSnafu)?;
107+
108+
let mut out: DecomposedRow = HashMap::new();
109+
110+
match value {
111+
serde_json::Value::Object(map) => {
112+
// Use BTreeMap so the serialized catch-all has deterministic,
113+
// sorted keys (matches DynamoDB's `json_nest` behavior).
114+
let mut catchall: BTreeMap<String, serde_json::Value> = BTreeMap::new();
115+
116+
for (k, v) in map {
117+
if nesting.static_fields.contains(&k) {
118+
out.insert(k, json_value_to_string(v));
119+
} else {
120+
catchall.insert(k, v);
121+
}
122+
}
123+
124+
// Any declared static field that was absent from the row
125+
// becomes explicit NULL rather than being missing from the
126+
// batch.
127+
for name in &nesting.static_fields {
128+
out.entry(name.clone()).or_insert(None);
129+
}
130+
131+
let catchall_str = if catchall.is_empty() {
132+
None
133+
} else {
134+
Some(serde_json::to_string(&catchall).context(JsonSerializeSnafu)?)
135+
};
136+
out.insert(nesting.json_field_name.clone(), catchall_str);
137+
}
138+
other => {
139+
// Non-object row: preserve it in the catch-all column so no
140+
// data is lost. Static fields are NULL.
141+
for name in &nesting.static_fields {
142+
out.insert(name.clone(), None);
143+
}
144+
out.insert(
145+
nesting.json_field_name.clone(),
146+
Some(serde_json::to_string(&other).context(JsonSerializeSnafu)?),
147+
);
148+
}
149+
}
150+
151+
Ok(out)
152+
}
153+
154+
/// Convert a JSON value to the string representation used for static
155+
/// columns. JSON strings are emitted verbatim (no surrounding quotes);
156+
/// objects and arrays are re-serialized to JSON text; `null` maps to
157+
/// SQL `NULL`.
158+
fn json_value_to_string(v: serde_json::Value) -> Option<String> {
159+
match v {
160+
serde_json::Value::Null => None,
161+
serde_json::Value::String(s) => Some(s),
162+
other => Some(other.to_string()),
163+
}
164+
}
165+
166+
#[cfg(test)]
167+
mod tests {
168+
use super::*;
169+
use serde_json::json;
170+
171+
fn nesting(cols: &[&str], json_field: &str) -> HttpJsonNesting {
172+
HttpJsonNesting::new(
173+
cols.iter().map(|s| (*s).to_string()).collect(),
174+
json_field.to_string(),
175+
)
176+
}
177+
178+
#[test]
179+
fn decomposes_object_into_static_and_catchall() {
180+
let n = nesting(&["id", "title", "data"], "data");
181+
let row = json!({
182+
"id": "abc",
183+
"title": "hello",
184+
"description": "a value",
185+
"count": 42,
186+
"nested": {"x": 1, "y": [1, 2]}
187+
})
188+
.to_string();
189+
190+
let d = decompose_json_row(&row, &n).expect("decompose");
191+
192+
assert_eq!(d.get("id").expect("id").as_deref(), Some("abc"));
193+
assert_eq!(d.get("title").expect("title").as_deref(), Some("hello"));
194+
195+
let catchall = d.get("data").expect("data").as_deref().expect("catchall");
196+
let parsed: serde_json::Value = serde_json::from_str(catchall).expect("parse catchall");
197+
assert_eq!(parsed["description"], "a value");
198+
assert_eq!(parsed["count"], 42);
199+
assert_eq!(parsed["nested"]["x"], 1);
200+
}
201+
202+
#[test]
203+
fn missing_static_field_is_null() {
204+
let n = nesting(&["id", "title", "data"], "data");
205+
let row = json!({"id": "abc"}).to_string();
206+
let d = decompose_json_row(&row, &n).expect("decompose");
207+
assert_eq!(d.get("id").expect("id").as_deref(), Some("abc"));
208+
assert!(d.get("title").expect("title").is_none());
209+
// no extra keys => catch-all is NULL
210+
assert!(d.get("data").expect("data").is_none());
211+
}
212+
213+
#[test]
214+
fn complex_static_field_serialized_as_json() {
215+
let n = nesting(&["payload", "data"], "data");
216+
let row = json!({"payload": {"a": 1}, "extra": "e"}).to_string();
217+
let d = decompose_json_row(&row, &n).expect("decompose");
218+
let payload = d.get("payload").expect("payload").as_deref().expect("val");
219+
let parsed: serde_json::Value = serde_json::from_str(payload).expect("parse");
220+
assert_eq!(parsed["a"], 1);
221+
assert!(d.get("data").expect("data").is_some());
222+
}
223+
224+
#[test]
225+
fn non_object_row_goes_to_catchall() {
226+
let n = nesting(&["id", "data"], "data");
227+
let row = json!([1, 2, 3]).to_string();
228+
let d = decompose_json_row(&row, &n).expect("decompose");
229+
assert!(d.get("id").expect("id").is_none());
230+
assert_eq!(d.get("data").expect("data").as_deref(), Some("[1,2,3]"));
231+
}
232+
233+
#[test]
234+
fn catchall_keys_are_sorted() {
235+
let n = nesting(&["id", "data"], "data");
236+
let row = json!({"id": "x", "zeta": 1, "alpha": 2, "mu": 3}).to_string();
237+
let d = decompose_json_row(&row, &n).expect("decompose");
238+
let catchall = d.get("data").expect("data").as_deref().expect("val");
239+
// BTreeMap ordering => keys alphabetical
240+
assert_eq!(catchall, r#"{"alpha":2,"mu":3,"zeta":1}"#);
241+
}
242+
243+
#[test]
244+
fn null_static_field_stays_null() {
245+
let n = nesting(&["id", "data"], "data");
246+
let row = json!({"id": null, "extra": 1}).to_string();
247+
let d = decompose_json_row(&row, &n).expect("decompose");
248+
assert!(d.get("id").expect("id").is_none());
249+
}
250+
}

crates/data_components/src/http/mod.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,4 +15,7 @@ limitations under the License.
1515
*/
1616

1717
pub mod auth;
18+
pub mod json_nest;
1819
pub mod provider;
20+
21+
pub use json_nest::HttpJsonNesting;

0 commit comments

Comments
 (0)