Skip to content

Commit 924691c

Browse files
lukekimkrinart
andauthored
feat(datafusion): flatten_json_properties + json_tree UDTFs (spiceai#10406)
* feat(datafusion): add flatten_json_properties and json_tree UDTFs (M1) M1 skeleton of the `flatten_json_properties` table function from spiceai#10399 — recursively walks a JSON-Schema-shaped document's `properties` tree and emits one row per field with path, parent_path, name, description, type, required, format, enum_values, and metadata columns. Also adds `json_tree`, a schema-agnostic recursive JSON walker modeled on DuckDB/SQLite's function of the same name (cols: key, value, type, atom, id, parent, fullkey, path) so users have a generic alternative when their input isn't JSON-Schema-shaped. Both are experimental and gated behind `flatten-json-properties` and `json-tree` Cargo features (off by default). M1 accepts only literal JSON string arguments; per-row LATERAL invocation with a column reference lands in M2 alongside `$ref` / `allOf` / `oneOf` / `anyOf` resolution, `items.properties`, `additionalProperties` maps, the options struct, cycle detection, and metrics. Refs spiceai#10399 * feat(datafusion): complete M2-M4 for flatten_json_properties + json_tree M1 shipped a `properties`-only skeleton behind a feature flag. This commit lands the rest of the milestones for both functions. M2 — Full shape coverage: - `items.properties` — arrays of objects; leaves emit at `array.field`. - `additionalProperties` — typed maps; `type = "map"` and children at `map.child`. - `allOf` / `oneOf` / `anyOf` — fields merged across branches with first- declaration dedupe; `required` is union across branches. - Local `$ref` resolution (JSON Pointer syntax, including `~0` / `~1` escapes) with an active-ref set for cycle detection — cycles yield a `kind=cycle` metric, no stack overflow. - External `$ref` URIs — surfaced as `type = "ref"` rows with the URI captured in `metadata`. Never dereferenced (no network / file IO). - Options surface (named args on both UDTF and planning path): `max_depth`, `max_rows`, `max_bytes`, `dialect`, `include_internal`, `path_style` (`dot` or `json-pointer`). - OpenTelemetry counters: `flatten_json_properties_invocations_total`, `_rows_emitted_total`, `_errors_total{kind}` where kind ∈ {parse, depth_exceeded, row_cap_hit, cycle, input_too_large}. Same set for `json_tree` (with applicable kinds). - Scalar UDF companion registered under the same name, returning `List<Struct<...>>` — gives per-row / LATERAL semantics via `UNNEST(flatten_json_properties(s.body))`. `json_tree` brought to parity: max_depth / max_bytes options, scalar UDF variant, cycle-independent depth cap, metrics. M3 — UX + perf: - Cookbook recipe at `examples/flatten-json-properties/` with a worked spicepod.yaml (dataset → view via UNNEST → DuckDB acceleration → column-level embeddings → vector_search) plus a 3-document sample. - Bench harness at `crates/runtime/benches/flatten_json_properties.rs` with Criterion groups for flat-schema fan-out, nested depth, and a 1k-schema catalog simulation. M4 — Release decision: - Feature flags dropped. Both UDTFs + UDFs register unconditionally on every build. Default behavior change vs M1: `include_internal` is now `false` (spec default), so container rows (`object` / `array` / `map`) are suppressed unless the caller opts in. 32 unit tests covering the full shape matrix, ref resolution, cycle termination, option parsing, limit tripping, path-style variants, scalar UDF per-row dispatch with NULLs, and UDTF plan integration. Refs spiceai#10399 * refactor(datafusion/udtf): simplify walker per /simplify review - Replace hand-rolled `resolve_local_ref` with `serde_json::Value::pointer`. - Delete `collect_effective_owned` and the `Cow<'static>` lifetime-laundering dance; everything walked lives under the walker's `&'a Value` root, so `&'a Value` suffices. Removes two identical recursion paths and the deep target clone on every `$ref` resolution. - Drop the dead `depth` parameter from `collect_effective`. - Hoist `property_fields` / `tree_fields` into static `LazyLock<Fields>` handles so the schema isn't reallocated on every call. - Extract `build_tree_arrays` in `json_tree` so `rows_to_batch` and the scalar-UDF struct-array builder share one implementation. - Borrow-not-clone for `HashSet<&str>` required / seen_names in the walker. - Strip WHAT-style comments and task-references from the bench. * fix(datafusion/udtf): address PR review feedback - Update copyright headers to 2024-2026 across the new UDTF files. - Tighten scalar UDF signatures (`flatten_json_properties` / `json_tree`) to accept Utf8 / LargeUtf8 / Utf8View; normalize via `cast` so non-Utf8 string columns no longer panic in `as_string_array`. - Cap combinator / `$ref` expansion in `collect_effective` by threading a ref-depth counter through recursion; prevents pathological chains from bypassing `max_depth` / exhausting the stack. - Clarify `dialect` option semantics in docs: currently only tags invocation metrics; OpenAPI-specific walker behavior is future scope. - `compute_type` no longer treats non-object `properties` / non-object-or- array `items` as `object` / `array`. - Collapse duplicate-row emission in `handle_field`: recurse once on the original `spec` so `walk_schema`'s `seen_names` de-duplicates fields across allOf/oneOf/anyOf / `$ref` branches. - Document single-node-only scan for both UDTFs (cluster mode requires a `UdtfArgs` proto variant + codec, tracked as follow-up). - Fix three branch-local clippy `collapsible_if` errors and annotate `emit_row`'s argument count. * fix(datafusion/udtf): address second round of PR review - `json_tree`: root row now emits `path = NULL` (field is nullable) to match DuckDB / SQLite `json_tree` semantics; children still carry the parent fullkey as `path`. - `json_tree`: array element rows now set `key = idx.to_string()` so consumers can distinguish array siblings (previously NULL). - `flatten_json_properties`: container fields with no walkable children (array of primitives, map of primitives, empty object) are now emitted as leaf rows in `include_internal = false` mode, so the field still appears in output. - Deny `flatten_json_properties` / `json_tree` scalar UDFs for federation pushdown; add them to the existing `deny_list_blocks_spice_builtins` test so regressions are caught. README double-pipe comment was a false positive (the file already uses single `|` with `\|` escapes inside cells). * fix(datafusion/udtf): address round-3 PR review - `json_tree`: add `max_rows` option (default 1,000,000) so bounded `max_bytes` input can't explode into unbounded row counts. Walker records `row_cap_hit` metric when hit and truncates cleanly. - `json_tree`: clarify module-level docs — named options are UDTF-form only; the scalar UDF takes just the JSON argument with default caps. - Both scalar UDFs now truncate deterministically at `i32::MAX` flattened rows (with a `row_cap_hit` metric) instead of returning a query-level `Execution` error on `List<Struct>` offset overflow. Preserves the "never a query-level error" contract. Not addressed: re-raised comments on `DataSourceExec` / cluster-mode `UdtfExec` wrapping — documented as follow-up scope in the prior commit; wrapping requires a new `UdtfArgs` proto variant + codec. * style: cargo fmt line-wrap in flatten_json_properties scalar UDF * fix(datafusion/udtf): bracket-quote JSON-path keys with hyphens SQLite / DuckDB `json_tree` path shorthand only accepts identifier-style keys; anything else must be bracket-quoted so consumers can re-parse the `fullkey`. Previously a key like `has-hyphen` was rendered as `$.a-b`, which isn't a valid shorthand. Now forces bracket-quoting for keys with any non-identifier character, and extends the existing special-character test to cover hyphens. * fix(datafusion/udtf): switch scalar UDFs to LargeList<Struct<...>> Copilot flagged that i32 ListArray offsets could silently truncate results when the flattened row count across a batch exceeds i32::MAX (only a metric signal was emitted). Silent incomplete results risk query correctness. Switching to LargeList (i64 offsets) makes overflow effectively impossible with no behavior change — UNNEST works transparently on both variants. Drops the `max_flattened_rows` truncation path entirely. * style(datafusion/udtf): fix pedantic clippy + fmt errors CI's `make lint-rust` uses `clippy::pedantic + clippy::allow_attributes + clippy::unwrap_used + clippy::expect_used`, which surfaced: - `#[allow(clippy::too_many_arguments)]` → `#[expect(...)]` with reason (lint 1.81+ requires explicit expect for cleared warnings). - `doc_markdown`: backtick-wrap `UInt`, `Bool`, `Utf8`, numeric defaults, `DuckDB`, `SQLite`, `DoS`, `DataFusion`, `OpenAPI` in module docs. - `single_match_else` + `match_like_matches_macro`: rewrite the `serde_json::from_str` match as `let Ok(root) = ... else { ... }`. - `.unwrap()` on `key.chars().next()` in `escape_object_key` → `is_some_and`. - `name.to_string()` on `&String` → `name.clone()`. - `all_rows.len() as i64` → `i64::try_from(...).unwrap_or(i64::MAX)` (walker caps bound the count well under i64::MAX; saturate instead of unwrap since the lint config bans `.unwrap()`/`.expect()`). * fix(datafusion/udtf): type-union ordering + fail-loud on offset overflow - `compute_type`: when `"type"` is an array (JSON-Schema nullable syntax, e.g. `["null", "string"]`), pick the first non-null entry so optional fields classify as their real type instead of `"null"`. Falls back to `"null"` only when it's the sole type. Extended test coverage. - Both scalar UDFs: `i64::try_from(row_count)` now returns a `DataFusionError::Execution` on overflow instead of saturating to `i64::MAX`. Saturation would silently misalign `LargeList` offsets; erroring surfaces the (unreachable-in-practice) condition loudly. * fix(datafusion/udtf): cross-walk cycle detection + batch row cap - `walk_schema` now persists `$ref` insertion in `visited_refs` for the duration of the tree-walk recursion, not just for a single `collect_effective` pass. Fixes a leak where schemas like `{$defs: {Node: {properties: {next: {$ref: #/$defs/Node}}}}, properties: {root: {$ref: #/$defs/Node}}}` could descend past the first resolution boundary. Tightened `local_ref_cycle_terminates` to assert stopping at `root.next`. - Both scalar UDFs now error on `DataFusionError::Execution` if the accumulated cross-batch row count exceeds `SCALAR_BATCH_MAX_ROWS` (10M). Per-document caps bound single rows, but a wide batch could previously reach `number_rows * max_rows` in memory before returning. * fix(udtf): pass projection to MemorySourceConfig in json_properties and json_tree Both UDTFs were ignoring the projection parameter in scan(), causing a schema mismatch error when selecting specific columns (e.g. SELECT path, name, type FROM flatten_json_properties(...)). Pass projection.cloned() to MemorySourceConfig::try_new() so DataFusion can push column pruning down into the scan. * fix: format MemorySourceConfig initialization for better readability * Tests + Lint * fix(tests): improve error handling and assertions in JSON property tests * fix(tests): update projection comments for clarity in JSON schema tests --------- Co-authored-by: Viktor Yershov <viktor@spice.ai>
1 parent 4e679c0 commit 924691c

9 files changed

Lines changed: 2742 additions & 0 deletions

File tree

crates/runtime/Cargo.toml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -374,3 +374,7 @@ vortex-datafusion.workspace = true
374374
[[bench]]
375375
harness = false
376376
name = "prepared_statement"
377+
378+
[[bench]]
379+
harness = false
380+
name = "flatten_json_properties"
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
#![allow(clippy::expect_used)]
2+
3+
//! Benchmarks for `flatten_json_properties`.
4+
//!
5+
//! Exercises the walker in isolation (no `DataFusion` plumbing) so regressions
6+
//! attributable to the walker itself surface without noise from query planning
7+
//! or Arrow I/O. `bench_catalog_simulation` approximates the typical
8+
//! materialization shape — 1k schemas × 50 fields per schema.
9+
10+
use std::hint::black_box;
11+
12+
use criterion::{BenchmarkId, Criterion, Throughput, criterion_group, criterion_main};
13+
use runtime::datafusion::udtf::json_properties::{FlattenOptions, flatten_with_options};
14+
15+
fn synthetic_schema(num_fields: usize) -> String {
16+
// One flat object with `num_fields` primitive properties. Representative of
17+
// a wide data-product schema where most fields are leaves.
18+
let mut props = String::from("{");
19+
for i in 0..num_fields {
20+
if i > 0 {
21+
props.push(',');
22+
}
23+
props.push_str(&format!(
24+
r#""field_{i}":{{"type":"string","description":"Field {i}","format":"text"}}"#
25+
));
26+
}
27+
props.push('}');
28+
format!(r#"{{"properties":{props}}}"#)
29+
}
30+
31+
fn nested_schema(depth: usize) -> String {
32+
// Deeply nested single-chain schema. Exercises the recursion path.
33+
let mut inner = String::from(r#"{"type":"string"}"#);
34+
for _ in 0..depth {
35+
inner = format!(r#"{{"type":"object","properties":{{"n":{inner}}}}}"#);
36+
}
37+
format!(r#"{{"properties":{{"root":{inner}}}}}"#)
38+
}
39+
40+
fn bench_flat_schemas(c: &mut Criterion) {
41+
let opts = FlattenOptions {
42+
include_internal: true,
43+
..FlattenOptions::default()
44+
};
45+
let mut group = c.benchmark_group("flatten_json_properties/flat");
46+
for fields in [16usize, 128, 512] {
47+
let doc = synthetic_schema(fields);
48+
group.throughput(Throughput::Elements(fields as u64));
49+
group.bench_with_input(BenchmarkId::new("fields", fields), &doc, |b, doc| {
50+
b.iter(|| {
51+
let rows = flatten_with_options(black_box(doc), &opts);
52+
black_box(rows);
53+
});
54+
});
55+
}
56+
group.finish();
57+
}
58+
59+
fn bench_nested_schemas(c: &mut Criterion) {
60+
let opts = FlattenOptions {
61+
include_internal: true,
62+
max_depth: 32,
63+
..FlattenOptions::default()
64+
};
65+
let mut group = c.benchmark_group("flatten_json_properties/nested");
66+
for depth in [4usize, 8, 16] {
67+
let doc = nested_schema(depth);
68+
group.throughput(Throughput::Elements(depth as u64));
69+
group.bench_with_input(BenchmarkId::new("depth", depth), &doc, |b, doc| {
70+
b.iter(|| {
71+
let rows = flatten_with_options(black_box(doc), &opts);
72+
black_box(rows);
73+
});
74+
});
75+
}
76+
group.finish();
77+
}
78+
79+
fn bench_catalog_simulation(c: &mut Criterion) {
80+
let opts = FlattenOptions::default();
81+
let doc = synthetic_schema(50);
82+
c.bench_function("flatten_json_properties/catalog_1k_schemas", |b| {
83+
b.iter(|| {
84+
for _ in 0..1000 {
85+
let rows = flatten_with_options(black_box(&doc), &opts);
86+
black_box(rows);
87+
}
88+
});
89+
});
90+
}
91+
92+
criterion_group!(
93+
benches,
94+
bench_flat_schemas,
95+
bench_nested_schemas,
96+
bench_catalog_simulation
97+
);
98+
criterion_main!(benches);

crates/runtime/src/datafusion/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,7 @@ pub mod secrets_context_extension;
131131
pub mod sort_columns;
132132
pub(crate) mod sql_validator;
133133
pub mod udf;
134+
pub mod udtf;
134135

135136
pub const SPICE_DEFAULT_CATALOG: &str = "spice";
136137
pub const SPICE_RUNTIME_SCHEMA: &str = "runtime";

crates/runtime/src/datafusion/udf.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@ limitations under the License.
1717
use std::collections::HashSet;
1818
use std::sync::{Arc, LazyLock};
1919

20+
use crate::datafusion::udtf::json_properties::{
21+
FLATTEN_JSON_PROPERTIES_UDTF_NAME, FlattenJsonPropertiesScalar, FlattenJsonPropertiesTableFunc,
22+
};
23+
use crate::datafusion::udtf::json_tree::{JSON_TREE_UDTF_NAME, JsonTreeScalar, JsonTreeTableFunc};
2024
use crate::embeddings::udtf::{VECTOR_SEARCH_UDTF_NAME, VectorSearchTableFunc};
2125
use crate::search::full_text::udtf::{TEXT_SEARCH_UDTF_NAME, TextSearchTableFunc};
2226
use crate::search::rrf;
@@ -80,6 +84,17 @@ pub async fn register_udfs(runtime: &crate::Runtime) {
8084
Arc::new(rrf::ReciprocalRankFusion::from_ctx(ctx)),
8185
);
8286

87+
// `flatten_json_properties` / `json_tree` — JSON-Schema and generic JSON
88+
// shredders. Registered as both UDTF (FROM-clause, literal input) and
89+
// ScalarUDF returning `List<Struct<...>>` (per-row / LATERAL via UNNEST).
90+
ctx.register_udtf(
91+
FLATTEN_JSON_PROPERTIES_UDTF_NAME,
92+
Arc::new(FlattenJsonPropertiesTableFunc::new()),
93+
);
94+
ctx.register_udf(FlattenJsonPropertiesScalar::new().into());
95+
ctx.register_udtf(JSON_TREE_UDTF_NAME, Arc::new(JsonTreeTableFunc::new()));
96+
ctx.register_udf(JsonTreeScalar::new().into());
97+
8398
#[cfg(feature = "models")]
8499
{
85100
ctx.register_udf(embed::Embed::new(runtime.embeds()).into());
@@ -101,6 +116,8 @@ static DENY_SPICE_SPECIFIC_FUNCTIONS: LazyLock<FunctionSupport> = LazyLock::new(
101116
#[cfg(feature = "models")]
102117
AI_UDF_NAME,
103118
DIGEST_UDF_NAME,
119+
FLATTEN_JSON_PROPERTIES_UDTF_NAME,
120+
JSON_TREE_UDTF_NAME,
104121
];
105122

106123
FunctionSupport::new(
@@ -191,6 +208,8 @@ mod tests {
191208
spice_udf(Bucket::new()),
192209
spice_udf(Truncate::new()),
193210
Arc::new(INSTANCE.clone()),
211+
spice_udf(FlattenJsonPropertiesScalar::new()),
212+
spice_udf(JsonTreeScalar::new()),
194213
];
195214

196215
for udf in spice_udfs {

0 commit comments

Comments
 (0)