Skip to content

Commit 5588bae

Browse files
fix(http): reject OR across different HTTP filter columns (spiceai#10625)
The HTTP connector's filter pushdown used the same PartitionAccumulator for both AND and OR operators, so an OR across different virtual filter columns (request_path, request_query, request_body, request_headers) was silently rewritten as a cross product (AND), causing the connector to issue a single combined HTTP request instead of two separate ones. DataFusion's Inexact pushdown then re-applied the predicate and the resulting row often passed, masking the bug while the upstream API received a request the user never wrote. Reject the cross-column OR case at extract_partitions time with a clear error pointing the user to IN (...) or separate queries / UNION ALL. OR within a single column continues to work (it's how DataFusion sometimes lowers short IN lists). Fixes spiceai#10623
1 parent 386669c commit 5588bae

2 files changed

Lines changed: 132 additions & 3 deletions

File tree

crates/data_components/src/http/provider.rs

Lines changed: 130 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2559,14 +2559,67 @@ impl HttpTableProvider {
25592559
) -> Result<()> {
25602560
match expr.op {
25612561
Operator::Eq => self.handle_equality_expr(expr, accumulator),
2562-
Operator::Or | Operator::And => {
2562+
Operator::And => {
2563+
self.extract_filter_values(expr.left.as_ref(), accumulator)?;
2564+
self.extract_filter_values(expr.right.as_ref(), accumulator)
2565+
}
2566+
Operator::Or => {
2567+
// OR within a single HTTP virtual filter column is treated as
2568+
// an IN list (alternative values). OR across different
2569+
// columns would be silently rewritten as a cross product
2570+
// (AND) by the partition accumulator, causing the connector
2571+
// to issue combined HTTP requests instead of separate ones.
2572+
// Reject the cross-column case explicitly. (Note: SQL
2573+
// `IN (...)` is sometimes pre-rewritten by DataFusion into a
2574+
// chain of OR-of-equality, which is why same-column OR must
2575+
// still be accepted.)
2576+
let mut columns = HashSet::new();
2577+
Self::collect_http_filter_columns(expr.left.as_ref(), &mut columns);
2578+
Self::collect_http_filter_columns(expr.right.as_ref(), &mut columns);
2579+
if columns.len() > 1 {
2580+
let mut names: Vec<&str> = columns.into_iter().collect();
2581+
names.sort_unstable();
2582+
return Err(Error::FilterRejected {
2583+
message: format!(
2584+
"OR across different HTTP filter columns ({}) is not supported because the connector would otherwise issue combined HTTP requests instead of separate ones. Use IN (...) to enumerate values on a single column, or run separate queries (e.g. UNION ALL) for alternative requests.",
2585+
names.join(", ")
2586+
),
2587+
});
2588+
}
25632589
self.extract_filter_values(expr.left.as_ref(), accumulator)?;
25642590
self.extract_filter_values(expr.right.as_ref(), accumulator)
25652591
}
25662592
_ => Ok(()),
25672593
}
25682594
}
25692595

2596+
/// Walk an expression tree and collect the names of any HTTP virtual
2597+
/// filter columns (`request_path`, `request_query`, `request_body`,
2598+
/// `request_headers`) referenced anywhere inside it.
2599+
fn collect_http_filter_columns(expr: &Expr, columns: &mut HashSet<&'static str>) {
2600+
match expr {
2601+
Expr::BinaryExpr(BinaryExpr { left, right, .. }) => {
2602+
Self::collect_http_filter_columns(left.as_ref(), columns);
2603+
Self::collect_http_filter_columns(right.as_ref(), columns);
2604+
}
2605+
Expr::InList(in_list) => {
2606+
Self::collect_http_filter_columns(in_list.expr.as_ref(), columns);
2607+
}
2608+
Expr::Column(column) => {
2609+
if let Some(static_name) = match column.name.as_str() {
2610+
"request_path" => Some("request_path"),
2611+
"request_query" => Some("request_query"),
2612+
"request_body" => Some("request_body"),
2613+
"request_headers" => Some("request_headers"),
2614+
_ => None,
2615+
} {
2616+
columns.insert(static_name);
2617+
}
2618+
}
2619+
_ => {}
2620+
}
2621+
}
2622+
25702623
fn handle_equality_expr(
25712624
&self,
25722625
expr: &BinaryExpr,
@@ -3228,6 +3281,82 @@ mod tests {
32283281
assert!(partitions.contains(&(Some("/api/v2".to_string()), None, None, None)));
32293282
}
32303283

3284+
#[test]
3285+
fn test_extract_partitions_with_or_across_columns_is_rejected() {
3286+
let provider = base_provider()
3287+
.with_allowed_paths(vec!["/a".to_string()])
3288+
.expect("allowed paths")
3289+
.enable_query_filters(64);
3290+
// request_path = '/a' OR request_query = 'b=1'
3291+
let filters = vec![Expr::BinaryExpr(BinaryExpr {
3292+
left: Box::new(Expr::BinaryExpr(BinaryExpr {
3293+
left: Box::new(Expr::Column(Column::from_name("request_path"))),
3294+
op: Operator::Eq,
3295+
right: Box::new(Expr::Literal(
3296+
ScalarValue::Utf8(Some("/a".to_string())),
3297+
None,
3298+
)),
3299+
})),
3300+
op: Operator::Or,
3301+
right: Box::new(Expr::BinaryExpr(BinaryExpr {
3302+
left: Box::new(Expr::Column(Column::from_name("request_query"))),
3303+
op: Operator::Eq,
3304+
right: Box::new(Expr::Literal(
3305+
ScalarValue::Utf8(Some("b=1".to_string())),
3306+
None,
3307+
)),
3308+
})),
3309+
})];
3310+
3311+
let err = provider
3312+
.extract_partitions(&filters)
3313+
.expect_err("OR across HTTP virtual columns must be rejected");
3314+
let message = err.to_string();
3315+
assert!(
3316+
message.contains("OR across different HTTP filter columns"),
3317+
"unexpected error message: {message}"
3318+
);
3319+
assert!(message.contains("request_path"));
3320+
assert!(message.contains("request_query"));
3321+
}
3322+
3323+
#[test]
3324+
fn test_extract_partitions_with_or_across_columns_nested_is_rejected() {
3325+
let provider = base_provider()
3326+
.with_allowed_paths(vec!["/a".to_string()])
3327+
.expect("allowed paths")
3328+
.enable_header_filters(DEFAULT_MAX_HEADERS_LENGTH, vec!["x-sandbox-id".to_string()])
3329+
.expect("enable header filters");
3330+
// request_path = '/a' OR request_headers IN ('{"x-sandbox-id":"a"}')
3331+
let filters = vec![Expr::BinaryExpr(BinaryExpr {
3332+
left: Box::new(Expr::BinaryExpr(BinaryExpr {
3333+
left: Box::new(Expr::Column(Column::from_name("request_path"))),
3334+
op: Operator::Eq,
3335+
right: Box::new(Expr::Literal(
3336+
ScalarValue::Utf8(Some("/a".to_string())),
3337+
None,
3338+
)),
3339+
})),
3340+
op: Operator::Or,
3341+
right: Box::new(Expr::InList(InList {
3342+
expr: Box::new(Expr::Column(Column::from_name("request_headers"))),
3343+
list: vec![Expr::Literal(
3344+
ScalarValue::Utf8(Some(r#"{"x-sandbox-id":"a"}"#.to_string())),
3345+
None,
3346+
)],
3347+
negated: false,
3348+
})),
3349+
})];
3350+
3351+
let err = provider
3352+
.extract_partitions(&filters)
3353+
.expect_err("OR across HTTP virtual columns must be rejected");
3354+
assert!(
3355+
err.to_string()
3356+
.contains("OR across different HTTP filter columns")
3357+
);
3358+
}
3359+
32313360
#[test]
32323361
fn test_extract_partitions_with_combined_filters() {
32333362
let provider = base_provider()

docs/examples/http_refresh_sql_example.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,7 @@ The HTTP connector's `refresh_sql` supports the following filter expressions on
124124

125125
1. **Equality (`=`)**: `WHERE request_path = '/api/users'`
126126
2. **IN Lists**: `WHERE request_path IN ('/api/users', '/api/posts')`
127-
3. **OR expressions**: `WHERE request_path = '/api/users' OR request_path = '/api/posts'`
127+
3. **OR expressions** (single column only): `WHERE request_path = '/api/users' OR request_path = '/api/posts'`. OR across **different** filter columns is not supported — use separate queries (e.g. `UNION ALL`).
128128
4. **AND expressions**: `WHERE request_path = '/api/users' AND request_query = 'limit=10'`
129129
5. **POST requests**: `WHERE request_body = '{"key": "value"}'` (triggers POST with `http_post_content_type`)
130130
6. **Dynamic headers**: `WHERE request_headers IN ('{"x-sandbox-id":"sandbox-1"}', '{"x-sandbox-id":"sandbox-2"}')`
@@ -153,7 +153,7 @@ The HTTP connector's `refresh_sql` supports the following filter expressions on
153153
## Performance Considerations
154154

155155
- **Caching**: The HTTP connector respects `Cache-Control` headers and caches responses when `max-age` is set.
156-
- **Parallel Execution**: Multiple endpoints (from IN lists or OR expressions) are fetched in parallel.
156+
- **Parallel Execution**: Multiple endpoints (from IN lists or single-column OR expressions) are fetched in parallel.
157157
- **Filter Selectivity**: Use specific filters to minimize unnecessary HTTP requests.
158158
- **Partition Limits**: Use `max_request_partitions` to cap the number of HTTP requests created from cross-product filters.
159159

0 commit comments

Comments
 (0)