Skip to content

Commit f0384c7

Browse files
committed
Improve
1 parent 28c72d7 commit f0384c7

9 files changed

Lines changed: 556 additions & 0 deletions

datafusion-federation/src/analyzer/mod.rs

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,25 @@ fn get_plan_provider_recursively(
499499
let mut providers: HashMap<TableReference, Arc<dyn FederationProvider>> = HashMap::new();
500500

501501
plan.apply_with_subqueries(&mut |p: &LogicalPlan| -> Result<TreeNodeRecursion> {
502+
// Register SubqueryAlias names (e.g. `lineitem l1`) so that OuterReferenceColumn resolved
503+
// against the alias (e.g. `l1.l_orderkey`) can find the correct provider. Without this,
504+
// correlated subqueries that reference an aliased outer table mark the scan as Ambiguous,
505+
// breaking same-provider federation.
506+
if let LogicalPlan::SubqueryAlias(subquery_alias) = p {
507+
let alias_ref = TableReference::bare(subquery_alias.alias.table().to_string());
508+
subquery_alias
509+
.input
510+
.apply(&mut |child| -> Result<TreeNodeRecursion> {
511+
if let (Some(provider), Some(table_reference)) = get_leaf_provider(child)? {
512+
providers.insert(alias_ref.clone(), Arc::clone(&provider));
513+
providers.insert(table_reference, provider);
514+
return Ok(TreeNodeRecursion::Stop);
515+
}
516+
Ok(TreeNodeRecursion::Continue)
517+
})?;
518+
return Ok(TreeNodeRecursion::Continue);
519+
}
520+
502521
if let (Some(federation_provider), Some(table_reference)) = get_leaf_provider(p)? {
503522
providers.insert(table_reference, federation_provider);
504523
}

datafusion-federation/src/sql/mod.rs

Lines changed: 347 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1032,4 +1032,351 @@ mod tests {
10321032

10331033
Ok(())
10341034
}
1035+
1036+
// --- EXISTS / NOT EXISTS federation tests ---
1037+
1038+
fn make_table_with_schema(
1039+
name: &str,
1040+
schema: SchemaRef,
1041+
executor: &TestExecutor,
1042+
) -> Arc<dyn TableProvider> {
1043+
let table_ref = RemoteTableRef::try_from(name.to_string()).unwrap();
1044+
let table = Arc::new(RemoteTable::new(table_ref, schema));
1045+
let provider = Arc::new(SQLFederationProvider::new(Arc::new(executor.clone())));
1046+
let table_source = Arc::new(SQLTableSource { provider, table });
1047+
Arc::new(FederatedTableProviderAdaptor::new(table_source))
1048+
}
1049+
1050+
fn orders_schema() -> SchemaRef {
1051+
Arc::new(Schema::new(vec![
1052+
Field::new("o_orderkey", DataType::Int64, false),
1053+
Field::new("o_custkey", DataType::Int64, false),
1054+
Field::new("o_orderstatus", DataType::Utf8, false),
1055+
Field::new("o_orderdate", DataType::Date32, true),
1056+
]))
1057+
}
1058+
1059+
fn lineitem_schema() -> SchemaRef {
1060+
Arc::new(Schema::new(vec![
1061+
Field::new("l_orderkey", DataType::Int64, false),
1062+
Field::new("l_suppkey", DataType::Int64, false),
1063+
Field::new("l_commitdate", DataType::Date32, true),
1064+
Field::new("l_receiptdate", DataType::Date32, true),
1065+
]))
1066+
}
1067+
1068+
fn supplier_schema() -> SchemaRef {
1069+
Arc::new(Schema::new(vec![
1070+
Field::new("s_suppkey", DataType::Int64, false),
1071+
Field::new("s_name", DataType::Utf8, false),
1072+
Field::new("s_nationkey", DataType::Int64, false),
1073+
]))
1074+
}
1075+
1076+
/// Runs `EXPLAIN <query>`, collects the output, and returns a formatted
1077+
/// string containing both logical and physical plans.
1078+
async fn explain_query(ctx: &SessionContext, query: &str) -> String {
1079+
let explain_sql = format!("EXPLAIN {query}");
1080+
let batches = ctx
1081+
.sql(&explain_sql)
1082+
.await
1083+
.unwrap()
1084+
.collect()
1085+
.await
1086+
.unwrap();
1087+
let formatted =
1088+
datafusion::arrow::util::pretty::pretty_format_batches(&batches).unwrap();
1089+
formatted.to_string()
1090+
}
1091+
1092+
/// Same-provider EXISTS: both tables from the same compute context.
1093+
/// The entire plan should be federated as a single unit so the backend
1094+
/// can decorrelate EXISTS into a semi-join.
1095+
#[tokio::test]
1096+
async fn same_provider_exists_federated_as_single_unit() {
1097+
let executor_a = TestExecutor {
1098+
compute_context: "ctx_a".into(),
1099+
cannot_federate: None,
1100+
};
1101+
1102+
let state = crate::default_session_state();
1103+
let ctx = SessionContext::new_with_state(state);
1104+
1105+
ctx.register_table(
1106+
"orders",
1107+
make_table_with_schema("orders", orders_schema(), &executor_a),
1108+
)
1109+
.unwrap();
1110+
ctx.register_table(
1111+
"lineitem",
1112+
make_table_with_schema("lineitem", lineitem_schema(), &executor_a),
1113+
)
1114+
.unwrap();
1115+
1116+
insta::assert_snapshot!(
1117+
"same_provider_exists",
1118+
explain_query(
1119+
&ctx,
1120+
"SELECT o_orderkey FROM orders WHERE EXISTS \
1121+
(SELECT 1 FROM lineitem WHERE l_orderkey = o_orderkey)",
1122+
)
1123+
.await
1124+
);
1125+
}
1126+
1127+
/// Same-provider NOT EXISTS: mirrors TPC-H Q21 structure.
1128+
/// Must be federated as one unit so the backend can decorrelate to anti-join.
1129+
#[tokio::test]
1130+
async fn same_provider_not_exists_federated_as_single_unit() {
1131+
let executor_a = TestExecutor {
1132+
compute_context: "ctx_a".into(),
1133+
cannot_federate: None,
1134+
};
1135+
1136+
let state = crate::default_session_state();
1137+
let ctx = SessionContext::new_with_state(state);
1138+
1139+
ctx.register_table(
1140+
"supplier",
1141+
make_table_with_schema("supplier", supplier_schema(), &executor_a),
1142+
)
1143+
.unwrap();
1144+
ctx.register_table(
1145+
"lineitem",
1146+
make_table_with_schema("lineitem", lineitem_schema(), &executor_a),
1147+
)
1148+
.unwrap();
1149+
1150+
insta::assert_snapshot!(
1151+
"same_provider_not_exists",
1152+
explain_query(
1153+
&ctx,
1154+
"SELECT s_name FROM supplier WHERE NOT EXISTS \
1155+
(SELECT 1 FROM lineitem WHERE l_suppkey = s_suppkey \
1156+
AND l_receiptdate > l_commitdate)",
1157+
)
1158+
.await
1159+
);
1160+
}
1161+
1162+
/// Cross-provider EXISTS: outer table on provider A, subquery table on provider B.
1163+
/// Each side must be independently federated (multiple Federated nodes).
1164+
#[tokio::test]
1165+
async fn cross_provider_exists_separately_federated() {
1166+
let executor_a = TestExecutor {
1167+
compute_context: "ctx_a".into(),
1168+
cannot_federate: None,
1169+
};
1170+
let executor_b = TestExecutor {
1171+
compute_context: "ctx_b".into(),
1172+
cannot_federate: None,
1173+
};
1174+
1175+
let state = crate::default_session_state();
1176+
let ctx = SessionContext::new_with_state(state);
1177+
1178+
ctx.register_table(
1179+
"orders",
1180+
make_table_with_schema("orders", orders_schema(), &executor_a),
1181+
)
1182+
.unwrap();
1183+
ctx.register_table(
1184+
"lineitem",
1185+
make_table_with_schema("lineitem", lineitem_schema(), &executor_b),
1186+
)
1187+
.unwrap();
1188+
1189+
insta::assert_snapshot!(
1190+
"cross_provider_exists",
1191+
explain_query(
1192+
&ctx,
1193+
"SELECT o_orderkey FROM orders WHERE EXISTS \
1194+
(SELECT 1 FROM lineitem WHERE l_orderkey = o_orderkey)",
1195+
)
1196+
.await
1197+
);
1198+
}
1199+
1200+
/// Cross-provider NOT EXISTS: outer table on provider A, subquery table on provider B.
1201+
#[tokio::test]
1202+
async fn cross_provider_not_exists_separately_federated() {
1203+
let executor_a = TestExecutor {
1204+
compute_context: "ctx_a".into(),
1205+
cannot_federate: None,
1206+
};
1207+
let executor_b = TestExecutor {
1208+
compute_context: "ctx_b".into(),
1209+
cannot_federate: None,
1210+
};
1211+
1212+
let state = crate::default_session_state();
1213+
let ctx = SessionContext::new_with_state(state);
1214+
1215+
ctx.register_table(
1216+
"orders",
1217+
make_table_with_schema("orders", orders_schema(), &executor_a),
1218+
)
1219+
.unwrap();
1220+
ctx.register_table(
1221+
"lineitem",
1222+
make_table_with_schema("lineitem", lineitem_schema(), &executor_b),
1223+
)
1224+
.unwrap();
1225+
1226+
insta::assert_snapshot!(
1227+
"cross_provider_not_exists",
1228+
explain_query(
1229+
&ctx,
1230+
"SELECT o_orderkey FROM orders WHERE NOT EXISTS \
1231+
(SELECT 1 FROM lineitem WHERE l_orderkey = o_orderkey)",
1232+
)
1233+
.await
1234+
);
1235+
}
1236+
1237+
/// TPC-H Q21 pattern: same-provider JOIN + NOT EXISTS + EXISTS.
1238+
/// All tables on the same provider. The entire plan must be federated
1239+
/// as a single unit so the backend handles decorrelation.
1240+
#[tokio::test]
1241+
async fn same_provider_join_with_not_exists_federated_as_single_unit() {
1242+
let executor_a = TestExecutor {
1243+
compute_context: "ctx_a".into(),
1244+
cannot_federate: None,
1245+
};
1246+
1247+
let state = crate::default_session_state();
1248+
let ctx = SessionContext::new_with_state(state);
1249+
1250+
ctx.register_table(
1251+
"supplier",
1252+
make_table_with_schema("supplier", supplier_schema(), &executor_a),
1253+
)
1254+
.unwrap();
1255+
ctx.register_table(
1256+
"lineitem",
1257+
make_table_with_schema("lineitem", lineitem_schema(), &executor_a),
1258+
)
1259+
.unwrap();
1260+
ctx.register_table(
1261+
"orders",
1262+
make_table_with_schema("orders", orders_schema(), &executor_a),
1263+
)
1264+
.unwrap();
1265+
1266+
insta::assert_snapshot!(
1267+
"same_provider_join_not_exists",
1268+
explain_query(
1269+
&ctx,
1270+
"SELECT s_name \
1271+
FROM supplier \
1272+
JOIN lineitem ON s_suppkey = l_suppkey \
1273+
JOIN orders ON o_orderkey = l_orderkey \
1274+
WHERE NOT EXISTS ( \
1275+
SELECT 1 FROM lineitem AS l2 \
1276+
WHERE l2.l_orderkey = lineitem.l_orderkey \
1277+
AND l2.l_suppkey <> lineitem.l_suppkey \
1278+
)",
1279+
)
1280+
.await
1281+
);
1282+
}
1283+
1284+
/// Mixed providers: JOIN across providers + EXISTS subquery on a different provider.
1285+
/// supplier(ctx_a) JOIN lineitem(ctx_b) WHERE EXISTS on orders(ctx_a).
1286+
#[tokio::test]
1287+
async fn mixed_provider_join_with_exists() {
1288+
let executor_a = TestExecutor {
1289+
compute_context: "ctx_a".into(),
1290+
cannot_federate: None,
1291+
};
1292+
let executor_b = TestExecutor {
1293+
compute_context: "ctx_b".into(),
1294+
cannot_federate: None,
1295+
};
1296+
1297+
let state = crate::default_session_state();
1298+
let ctx = SessionContext::new_with_state(state);
1299+
1300+
ctx.register_table(
1301+
"supplier",
1302+
make_table_with_schema("supplier", supplier_schema(), &executor_a),
1303+
)
1304+
.unwrap();
1305+
ctx.register_table(
1306+
"lineitem",
1307+
make_table_with_schema("lineitem", lineitem_schema(), &executor_b),
1308+
)
1309+
.unwrap();
1310+
ctx.register_table(
1311+
"orders",
1312+
make_table_with_schema("orders", orders_schema(), &executor_a),
1313+
)
1314+
.unwrap();
1315+
1316+
insta::assert_snapshot!(
1317+
"mixed_provider_join_exists",
1318+
explain_query(
1319+
&ctx,
1320+
"SELECT s_name \
1321+
FROM supplier \
1322+
JOIN lineitem ON l_suppkey = s_suppkey \
1323+
WHERE EXISTS ( \
1324+
SELECT 1 FROM orders \
1325+
WHERE o_custkey = s_suppkey \
1326+
)",
1327+
)
1328+
.await
1329+
);
1330+
}
1331+
1332+
/// Same-provider NOT EXISTS with aliased outer table.
1333+
/// The outer query uses `lineitem l1` (alias). The NOT EXISTS subquery
1334+
/// references the alias: `l1.l_orderkey`. All tables are same provider.
1335+
#[tokio::test]
1336+
async fn same_provider_aliased_table_not_exists() {
1337+
let executor_a = TestExecutor {
1338+
compute_context: "ctx_a".into(),
1339+
cannot_federate: None,
1340+
};
1341+
1342+
let state = crate::default_session_state();
1343+
let ctx = SessionContext::new_with_state(state);
1344+
1345+
ctx.register_table(
1346+
"supplier",
1347+
make_table_with_schema("supplier", supplier_schema(), &executor_a),
1348+
)
1349+
.unwrap();
1350+
ctx.register_table(
1351+
"lineitem",
1352+
make_table_with_schema("lineitem", lineitem_schema(), &executor_a),
1353+
)
1354+
.unwrap();
1355+
ctx.register_table(
1356+
"orders",
1357+
make_table_with_schema("orders", orders_schema(), &executor_a),
1358+
)
1359+
.unwrap();
1360+
1361+
// TPC-H Q21 pattern: aliased lineitem l1, NOT EXISTS references l1.*
1362+
insta::assert_snapshot!(
1363+
"same_provider_aliased_not_exists",
1364+
explain_query(
1365+
&ctx,
1366+
"SELECT s_name, count(*) AS numwait \
1367+
FROM supplier, lineitem l1, orders \
1368+
WHERE s_suppkey = l1.l_suppkey \
1369+
AND l1.l_orderkey = o_orderkey \
1370+
AND l1.l_receiptdate > l1.l_commitdate \
1371+
AND NOT EXISTS ( \
1372+
SELECT 1 FROM lineitem l2 \
1373+
WHERE l2.l_orderkey = l1.l_orderkey \
1374+
AND l2.l_suppkey <> l1.l_suppkey \
1375+
) \
1376+
GROUP BY s_name \
1377+
ORDER BY numwait DESC, s_name",
1378+
)
1379+
.await
1380+
);
1381+
}
10351382
}

0 commit comments

Comments
 (0)