Skip to content

Commit 0beb321

Browse files
sgrebnovlukekim
andauthored
feat(http): Support dynamic HTTP connector request params from subqueries (spiceai#10636)
* Add integration test for dynamic HTTP request header filters * Fix lint * feat(http): Support dynamic HTTP connector request params from subqueries and joins * Improve * Add join index * Remove InnerJoin support (simplify) * Add tests * Add more tests * Improve * Improve naming * Fix tests * Update * Fix test --------- Co-authored-by: Luke Kim <80174+lukekim@users.noreply.github.com>
1 parent 6a77410 commit 0beb321

12 files changed

Lines changed: 1560 additions & 5 deletions

File tree

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/data_components/src/http/provider.rs

Lines changed: 288 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ pub const DEFAULT_MAX_BODY_BYTES: usize = 16 * 1024; // 16 KiB
138138
pub const DEFAULT_MAX_HEADERS_LENGTH: usize = 16 * 1024; // 16 KiB
139139
pub const DEFAULT_PAGINATION_MAX_PAGES: usize = 100;
140140
const MAX_REQUEST_PATH_LENGTH: usize = 1024;
141-
type PartitionSpec = (
141+
pub type PartitionSpec = (
142142
Option<String>,
143143
Option<String>,
144144
Option<String>,
@@ -600,6 +600,11 @@ impl HttpTableProvider {
600600
Ok(self)
601601
}
602602

603+
#[must_use]
604+
pub fn max_request_partitions(&self) -> Option<usize> {
605+
self.max_request_partitions
606+
}
607+
603608
#[must_use]
604609
pub fn with_max_request_partitions(mut self, max_request_partitions: Option<usize>) -> Self {
605610
self.max_request_partitions = max_request_partitions;
@@ -1415,9 +1420,42 @@ pub struct HttpExec {
14151420
partitions: Vec<PartitionSpec>,
14161421
limit: Option<usize>,
14171422
properties: PlanProperties,
1423+
/// When `true`, the partitions are a template that will be expanded
1424+
/// at runtime by `HttpWithDeferredParamsExec`. Display shows `partitions=deferred`.
1425+
deferred_partitions: bool,
14181426
}
14191427

14201428
impl HttpExec {
1429+
/// Returns the provider used by this exec.
1430+
#[must_use]
1431+
pub fn provider(&self) -> &Arc<HttpTableProvider> {
1432+
&self.provider
1433+
}
1434+
1435+
/// Returns the maximum number of request partitions allowed, if configured.
1436+
#[must_use]
1437+
pub fn max_request_partitions(&self) -> Option<usize> {
1438+
self.provider.max_request_partitions()
1439+
}
1440+
1441+
/// Returns the partition specs.
1442+
#[must_use]
1443+
pub fn partitions(&self) -> &[PartitionSpec] {
1444+
&self.partitions
1445+
}
1446+
1447+
/// Returns the limit.
1448+
#[must_use]
1449+
pub fn limit(&self) -> Option<usize> {
1450+
self.limit
1451+
}
1452+
1453+
/// Returns the projected schema.
1454+
#[must_use]
1455+
pub fn projected_schema(&self) -> &SchemaRef {
1456+
&self.projected_schema
1457+
}
1458+
14211459
#[must_use]
14221460
pub fn new(
14231461
projected_schema: SchemaRef,
@@ -1437,9 +1475,85 @@ impl HttpExec {
14371475
partitions,
14381476
limit,
14391477
properties,
1478+
deferred_partitions: false,
14401479
}
14411480
}
14421481

1482+
/// Mark this `HttpExec` as having dynamic partitions that will be
1483+
/// expanded at runtime. Affects EXPLAIN display only.
1484+
#[must_use]
1485+
pub fn with_deferred_partitions(mut self) -> Self {
1486+
self.deferred_partitions = true;
1487+
self
1488+
}
1489+
1490+
/// Create a new `HttpExec` whose partitions are the cross-product of the
1491+
/// current partitions and the given `values`, injected into the column
1492+
/// identified by `col_name` (`request_path`, `request_query`,
1493+
/// `request_body`, or `request_headers`).
1494+
///
1495+
/// Returns an error if the resulting partition count would exceed
1496+
/// `max_request_partitions`.
1497+
pub fn with_expanded_params(
1498+
&self,
1499+
col_name: &str,
1500+
values: &[String],
1501+
) -> DataFusionResult<Self> {
1502+
let existing = &self.partitions;
1503+
let new_count = existing.len() * values.len();
1504+
1505+
if let Some(max) = self.max_request_partitions()
1506+
&& new_count > max
1507+
{
1508+
return Err(DataFusionError::Plan(format!(
1509+
"HttpExec: expanding params would create {new_count} partitions (existing {} x {} values), which exceeds max_request_partitions={max}. Reduce the number of dynamic values or increase max_request_partitions.",
1510+
existing.len(),
1511+
values.len(),
1512+
)));
1513+
}
1514+
1515+
let mut new_partitions = Vec::with_capacity(new_count);
1516+
1517+
for partition in existing {
1518+
for value in values {
1519+
let mut p = partition.clone();
1520+
match col_name {
1521+
"request_path" => {
1522+
p.0 = Some(self.provider.ensure_allowed_path(value)?);
1523+
}
1524+
"request_query" => {
1525+
p.1 = Some(self.provider.ensure_allowed_query(value)?);
1526+
}
1527+
"request_body" => {
1528+
p.2 = Some(self.provider.ensure_allowed_body(value)?);
1529+
}
1530+
"request_headers" => {
1531+
p.3 = Some(self.provider.ensure_allowed_headers(value)?);
1532+
}
1533+
other => {
1534+
return Err(DataFusionError::Internal(format!(
1535+
"HttpExec::with_expanded_params: unsupported column '{other}'. Expected one of: request_path, request_query, request_body, request_headers"
1536+
)));
1537+
}
1538+
}
1539+
new_partitions.push(p);
1540+
}
1541+
}
1542+
1543+
tracing::debug!(
1544+
"HttpExec::with_expanded_params: replacing partitions with {} (was {}) for column '{col_name}'",
1545+
new_partitions.len(),
1546+
existing.len(),
1547+
);
1548+
1549+
Ok(Self::new(
1550+
Arc::clone(&self.projected_schema),
1551+
Arc::clone(&self.provider),
1552+
new_partitions,
1553+
self.limit,
1554+
))
1555+
}
1556+
14431557
async fn fetch_and_create_batch(
14441558
&self,
14451559
provider: &HttpTableProvider,
@@ -1737,10 +1851,16 @@ impl DisplayAs for HttpExec {
17371851
fn fmt_as(&self, _t: DisplayFormatType, f: &mut fmt::Formatter) -> std::fmt::Result {
17381852
write!(
17391853
f,
1740-
"HttpExec: base_url={}, format={}, partitions=[",
1854+
"HttpExec: base_url={}, format={}, ",
17411855
self.provider.base_url, self.provider.file_format
17421856
)?;
17431857

1858+
if self.deferred_partitions {
1859+
return write!(f, "partitions=deferred");
1860+
}
1861+
1862+
write!(f, "partitions=[")?;
1863+
17441864
for (i, (path, query, body, request_headers)) in self.partitions.iter().enumerate() {
17451865
if i > 0 {
17461866
write!(f, ", ")?;
@@ -6258,4 +6378,170 @@ mod tests {
62586378
let field_names: Vec<&str> = schema.fields().iter().map(|f| f.name().as_str()).collect();
62596379
assert_eq!(field_names, vec!["id", "name", "details"]);
62606380
}
6381+
6382+
// -----------------------------------------------------------------------
6383+
// with_expanded_params unit tests
6384+
// -----------------------------------------------------------------------
6385+
6386+
/// Helper: build an `HttpExec` with the given partitions and optional max.
6387+
///
6388+
/// Enables all filter types (path, query, body, headers) so that
6389+
/// `with_expanded_params` validation passes for any column.
6390+
fn make_exec(
6391+
partitions: Vec<PartitionSpec>,
6392+
max_request_partitions: Option<usize>,
6393+
) -> HttpExec {
6394+
let provider = base_provider()
6395+
.with_allowed_paths(["/*"])
6396+
.expect("valid path glob")
6397+
.enable_query_filters(DEFAULT_MAX_QUERY_LENGTH)
6398+
.enable_body_filters(DEFAULT_MAX_BODY_BYTES)
6399+
.enable_header_filters(DEFAULT_MAX_HEADERS_LENGTH, vec!["x-test"])
6400+
.expect("header filters should enable")
6401+
.with_max_request_partitions(max_request_partitions);
6402+
HttpExec::new(
6403+
HttpTableProvider::base_table_schema().into(),
6404+
Arc::new(provider),
6405+
partitions,
6406+
None,
6407+
)
6408+
}
6409+
6410+
#[test]
6411+
fn test_with_expanded_params_request_path() {
6412+
let exec = make_exec(vec![(None, None, None, None)], None);
6413+
let result = exec
6414+
.with_expanded_params("request_path", &["/a".to_string(), "/b".to_string()])
6415+
.expect("expand should succeed");
6416+
6417+
assert_eq!(result.partitions.len(), 2);
6418+
assert_eq!(result.partitions[0].0, Some("/a".to_string()));
6419+
assert_eq!(result.partitions[1].0, Some("/b".to_string()));
6420+
// Other tuple positions remain None.
6421+
assert_eq!(result.partitions[0].1, None);
6422+
assert_eq!(result.partitions[0].2, None);
6423+
assert_eq!(result.partitions[0].3, None);
6424+
}
6425+
6426+
#[test]
6427+
fn test_with_expanded_params_cross_product() {
6428+
let exec = make_exec(
6429+
vec![
6430+
(Some("/a".to_string()), None, None, None),
6431+
(Some("/b".to_string()), None, None, None),
6432+
],
6433+
None,
6434+
);
6435+
let result = exec
6436+
.with_expanded_params(
6437+
"request_query",
6438+
&["q1".to_string(), "q2".to_string(), "q3".to_string()],
6439+
)
6440+
.expect("expand should succeed");
6441+
6442+
// 2 existing × 3 values = 6 partitions
6443+
assert_eq!(result.partitions.len(), 6);
6444+
6445+
// First existing partition (/a) crossed with all three values
6446+
assert_eq!(result.partitions[0].0, Some("/a".to_string()));
6447+
assert_eq!(result.partitions[0].1, Some("q1".to_string()));
6448+
assert_eq!(result.partitions[1].0, Some("/a".to_string()));
6449+
assert_eq!(result.partitions[1].1, Some("q2".to_string()));
6450+
assert_eq!(result.partitions[2].0, Some("/a".to_string()));
6451+
assert_eq!(result.partitions[2].1, Some("q3".to_string()));
6452+
6453+
// Second existing partition (/b) crossed with all three values
6454+
assert_eq!(result.partitions[3].0, Some("/b".to_string()));
6455+
assert_eq!(result.partitions[3].1, Some("q1".to_string()));
6456+
assert_eq!(result.partitions[4].0, Some("/b".to_string()));
6457+
assert_eq!(result.partitions[4].1, Some("q2".to_string()));
6458+
assert_eq!(result.partitions[5].0, Some("/b".to_string()));
6459+
assert_eq!(result.partitions[5].1, Some("q3".to_string()));
6460+
}
6461+
6462+
#[test]
6463+
fn test_with_expanded_params_exceeds_max() {
6464+
// max=3, but 2 partitions × 2 query values = 4 → should fail
6465+
let exec = make_exec(
6466+
vec![
6467+
(Some("/a".to_string()), None, None, None),
6468+
(Some("/b".to_string()), None, None, None),
6469+
],
6470+
Some(3),
6471+
);
6472+
6473+
_ = exec
6474+
.with_expanded_params("request_query", &["q=1".to_string(), "q=2".to_string()])
6475+
.expect_err("should exceed max_request_partitions");
6476+
}
6477+
6478+
type PartitionAccessor = Box<dyn Fn(&PartitionSpec) -> &Option<String>>;
6479+
6480+
#[test]
6481+
fn test_with_expanded_params_all_columns() {
6482+
// Values must satisfy each column's validation rules:
6483+
// - request_path: must start with '/'
6484+
// - request_query: plain query string
6485+
// - request_body: plain body text
6486+
// - request_headers: JSON with allowed header names
6487+
let cases: Vec<(&str, &str, PartitionAccessor)> = vec![
6488+
("/val", "request_path", Box::new(|p: &PartitionSpec| &p.0)),
6489+
("val", "request_query", Box::new(|p: &PartitionSpec| &p.1)),
6490+
("val", "request_body", Box::new(|p: &PartitionSpec| &p.2)),
6491+
(
6492+
r#"{"x-test":"val"}"#,
6493+
"request_headers",
6494+
Box::new(|p: &PartitionSpec| &p.3),
6495+
),
6496+
];
6497+
6498+
for (test_value, col_name, accessor) in &cases {
6499+
let exec = make_exec(vec![(None, None, None, None)], None);
6500+
let result = exec
6501+
.with_expanded_params(col_name, &[test_value.to_string()])
6502+
.unwrap_or_else(|e| panic!("expand for {col_name} should succeed: {e}"));
6503+
6504+
assert_eq!(
6505+
result.partitions.len(),
6506+
1,
6507+
"one partition expected for {col_name}"
6508+
);
6509+
assert_eq!(
6510+
*accessor(&result.partitions[0]),
6511+
Some(test_value.to_string()),
6512+
"{col_name} should be set"
6513+
);
6514+
6515+
// Verify the OTHER positions are still None.
6516+
let all_accessors: Vec<PartitionAccessor> = vec![
6517+
Box::new(|p: &PartitionSpec| &p.0),
6518+
Box::new(|p: &PartitionSpec| &p.1),
6519+
Box::new(|p: &PartitionSpec| &p.2),
6520+
Box::new(|p: &PartitionSpec| &p.3),
6521+
];
6522+
let col_names = [
6523+
"request_path",
6524+
"request_query",
6525+
"request_body",
6526+
"request_headers",
6527+
];
6528+
for (other_name, other_accessor) in col_names.iter().zip(all_accessors.iter()) {
6529+
if *other_name != *col_name {
6530+
assert_eq!(
6531+
*other_accessor(&result.partitions[0]),
6532+
None,
6533+
"{other_name} should remain None when expanding {col_name}"
6534+
);
6535+
}
6536+
}
6537+
}
6538+
}
6539+
6540+
#[test]
6541+
fn test_with_expanded_params_unknown_column_errors() {
6542+
let exec = make_exec(vec![(Some("/orig".to_string()), None, None, None)], None);
6543+
let _ = exec
6544+
.with_expanded_params("nonexistent_column", &["x".to_string()])
6545+
.expect_err("unknown column should error");
6546+
}
62616547
}

crates/datafusion-optimizer-rules/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ tracing.workspace = true
3434
[dev-dependencies]
3535
arrow-flight.workspace = true
3636
insta.workspace = true
37+
reqwest.workspace = true
3738
tonic.workspace = true
3839

3940

0 commit comments

Comments
 (0)