Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 75 additions & 6 deletions crates/data_components/src/http/json_nest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,15 @@ limitations under the License.
//! declared static field is projected as a top-level `Utf8` column, and
//! all remaining keys are serialized into a JSON object string stored in
//! the catch-all column.
//!
//! In addition, declared columns whose names match one of the HTTP
//! connector's built-in metadata fields (`request_path`,
//! `request_query`, `request_body`, `request_headers`, `content`,
//! `response_status`, `response_headers`, `fetched_at`) are *passed
//! through* from the HTTP request/response rather than being decomposed
//! from the JSON body. This lets queries reference both decomposed
//! columns and the original HTTP metadata (e.g. for direct fetches via
//! filter pushdown on `request_path`).

use snafu::{ResultExt, Snafu};
use std::collections::{BTreeMap, HashMap, HashSet};
Expand Down Expand Up @@ -65,26 +74,41 @@ pub struct HttpJsonNesting {
/// position.
pub column_order: Vec<String>,
/// Set of declared static field names (i.e. `column_order` minus
/// `json_field_name`).
/// `json_field_name` and `metadata_fields`). These are extracted as
/// top-level keys from each JSON response row.
pub static_fields: HashSet<String>,
/// Set of declared columns sourced from HTTP request/response
/// metadata rather than from the JSON body. Names must match
/// fields in [`HttpTableProvider::base_table_schema`].
///
/// [`HttpTableProvider::base_table_schema`]: super::provider::HttpTableProvider::base_table_schema
pub metadata_fields: HashSet<String>,
/// Name of the catch-all JSON column.
pub json_field_name: String,
}

impl HttpJsonNesting {
/// Build a new nesting configuration from the declared column order
/// and the name of the catch-all column. The catch-all column name
/// must appear in `column_order`.
/// Build a new nesting configuration from the declared column order,
/// the name of the catch-all column, and the set of declared columns
/// that should be sourced from HTTP metadata rather than the JSON
/// body. The catch-all column name must appear in `column_order`.
#[must_use]
pub fn new(column_order: Vec<String>, json_field_name: String) -> Self {
pub fn new(
column_order: Vec<String>,
json_field_name: String,
metadata_fields: HashSet<String>,
) -> Self {
let static_fields: HashSet<String> = column_order
.iter()
.filter(|c| c.as_str() != json_field_name.as_str())
.filter(|c| {
c.as_str() != json_field_name.as_str() && !metadata_fields.contains(c.as_str())
})
.cloned()
.collect();
Self {
column_order,
static_fields,
metadata_fields,
json_field_name,
}
}
Expand Down Expand Up @@ -114,6 +138,14 @@ pub fn decompose_json_row(json_row: &str, nesting: &HttpJsonNesting) -> Result<D
let mut catchall: BTreeMap<String, serde_json::Value> = BTreeMap::new();

for (k, v) in map {
if nesting.metadata_fields.contains(&k) {
// Body keys colliding with HTTP metadata names are
// ignored here; the metadata column is populated
// from the actual HTTP request/response, not from
// the body. Drop the body key from both the static
// and catch-all outputs.
continue;
}
if nesting.static_fields.contains(&k) {
out.insert(k, json_value_to_string(v));
} else {
Expand Down Expand Up @@ -172,6 +204,15 @@ mod tests {
HttpJsonNesting::new(
cols.iter().map(|s| (*s).to_string()).collect(),
json_field.to_string(),
HashSet::new(),
)
}

fn nesting_with_meta(cols: &[&str], json_field: &str, meta: &[&str]) -> HttpJsonNesting {
HttpJsonNesting::new(
cols.iter().map(|s| (*s).to_string()).collect(),
json_field.to_string(),
meta.iter().map(|s| (*s).to_string()).collect(),
)
}

Expand Down Expand Up @@ -247,4 +288,32 @@ mod tests {
let d = decompose_json_row(&row, &n).expect("decompose");
assert!(d.get("id").expect("id").is_none());
}

#[test]
fn metadata_field_is_not_extracted_from_body() {
// `request_path` is declared as a metadata column. Even if the
// body contains a `request_path` key, it must not appear in the
// decomposed output (the metadata column is filled separately
// by the batch builder) and must not leak into the catch-all.
let n = nesting_with_meta(&["request_path", "id", "data"], "data", &["request_path"]);
let row = json!({
"request_path": "/should-be-ignored",
"id": "abc",
"extra": 1
})
.to_string();
let d = decompose_json_row(&row, &n).expect("decompose");
assert!(
!d.contains_key("request_path"),
"metadata field must not be populated from body"
);
assert_eq!(d.get("id").expect("id").as_deref(), Some("abc"));
let catchall = d.get("data").expect("data").as_deref().expect("val");
let parsed: serde_json::Value = serde_json::from_str(catchall).expect("parse");
assert!(
parsed.get("request_path").is_none(),
"metadata field must not leak into catch-all"
);
assert_eq!(parsed["extra"], 1);
}
}
Loading
Loading