Skip to content

Commit e0f4ba1

Browse files
committed
fix: extend column-existence validator to 20+ transforms
The validator added in 94c19f8 only covered xf.fill_forward, xf.fill_backward, xf.fill_constant, and xf.cast. Every other transform that names input columns by string (xf.distinct, xf.sort, xf.project, xf.drop, xf.keep, xf.rename, xf.aggregate, xf.pivot, xf.unpivot, xf.row_hash, xf.url.parse, xf.ip.parse, xf.cdc.scd1, xf.cdc.scd2, xf.cdc.compare, the whole window family, join leftKey) slipped through and only got caught by DuckDB at runtime as "Binder Error: Referenced column not found". User-reported: an xf.distinct configured with a column name that didn't exist in the upstream CSV (typo / stale config) ran for 85 ms before failing inside DuckDB with the cryptic runtime error instead of failing at planner time with a clear "column 'X' not found in upstream" message. Validator only fires when the upstream's column set is actually known (declared by autodetect on the source, propagated through the existing derive_output_columns). Joins skip right-side keys because lookup-input columns aren't propagated through the planner today; only main-side leftKey is checked. False negatives are preferable to false positives here.
1 parent de5c72b commit e0f4ba1

1 file changed

Lines changed: 133 additions & 8 deletions

File tree

crates/duckdb-engine/src/plan.rs

Lines changed: 133 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1561,12 +1561,29 @@ fn validate_column_refs(
15611561
c, case_hint
15621562
))
15631563
};
1564-
match component_id {
1565-
"xf.fill_forward" | "xf.fill_backward" | "xf.fill_constant" => {
1566-
if let Some(c) = p.get("column").and_then(JsonValue::as_str) {
1564+
// Helper for components whose props expose a single "column" key.
1565+
let check_single_col = |p: &JsonValue| -> Result<(), String> {
1566+
if let Some(c) = p.get("column").and_then(JsonValue::as_str) {
1567+
let c = c.trim();
1568+
if !c.is_empty() {
15671569
check(c)?;
15681570
}
15691571
}
1572+
Ok(())
1573+
};
1574+
let check_list = |key: &str| -> Result<(), String> {
1575+
for c in columns_list(p, key) {
1576+
let c = c.trim();
1577+
if !c.is_empty() {
1578+
check(c)?;
1579+
}
1580+
}
1581+
Ok(())
1582+
};
1583+
match component_id {
1584+
"xf.fill_forward" | "xf.fill_backward" | "xf.fill_constant" => {
1585+
check_single_col(p)?;
1586+
}
15701587
"xf.cast" => {
15711588
// Multi-row form
15721589
if let Some(arr) = p.get("casts").or_else(|| p.get("columns")).and_then(JsonValue::as_array) {
@@ -1579,11 +1596,119 @@ fn validate_column_refs(
15791596
}
15801597
}
15811598
}
1582-
// Single-row form
1583-
if let Some(c) = p.get("column").and_then(JsonValue::as_str) {
1584-
let c = c.trim();
1585-
if !c.is_empty() {
1586-
check(c)?;
1599+
check_single_col(p)?;
1600+
}
1601+
"xf.distinct" | "xf.drop" | "xf.keep" | "xf.unpivot" | "xf.row_hash" => {
1602+
check_list("columns")?;
1603+
}
1604+
"xf.project" => {
1605+
check_list("columns")?;
1606+
check_list("keep")?;
1607+
}
1608+
"xf.sort" => {
1609+
// orderBy is either an array of column-name strings or
1610+
// an array of {column, direction} objects. Validate both.
1611+
if let Some(arr) = p.get("orderBy").and_then(JsonValue::as_array) {
1612+
for entry in arr {
1613+
let c = entry
1614+
.as_str()
1615+
.map(|s| s.to_string())
1616+
.or_else(|| {
1617+
entry
1618+
.get("column")
1619+
.and_then(JsonValue::as_str)
1620+
.map(|s| s.to_string())
1621+
});
1622+
if let Some(c) = c {
1623+
let c = c.trim();
1624+
if !c.is_empty() {
1625+
check(c)?;
1626+
}
1627+
}
1628+
}
1629+
}
1630+
}
1631+
"xf.rename" => {
1632+
if let Some(arr) = p.get("renames").and_then(JsonValue::as_array) {
1633+
for entry in arr {
1634+
if let Some(c) = entry.get("from").and_then(JsonValue::as_str) {
1635+
let c = c.trim();
1636+
if !c.is_empty() {
1637+
check(c)?;
1638+
}
1639+
}
1640+
}
1641+
}
1642+
}
1643+
"xf.aggregate" => {
1644+
check_list("groupBy")?;
1645+
// aggregateColumns: [{column, fn}, ...] - check the column field.
1646+
if let Some(arr) = p.get("aggregateColumns").and_then(JsonValue::as_array) {
1647+
for entry in arr {
1648+
if let Some(c) = entry.get("column").and_then(JsonValue::as_str) {
1649+
let c = c.trim();
1650+
if !c.is_empty() {
1651+
check(c)?;
1652+
}
1653+
}
1654+
}
1655+
}
1656+
}
1657+
"xf.pivot" => {
1658+
check_single_col(p)?;
1659+
for key in ["pivotColumn", "valueColumn", "valuesColumn"] {
1660+
if let Some(c) = p.get(key).and_then(JsonValue::as_str) {
1661+
let c = c.trim();
1662+
if !c.is_empty() {
1663+
check(c)?;
1664+
}
1665+
}
1666+
}
1667+
check_list("groupBy")?;
1668+
}
1669+
"xf.url.parse" | "xf.ip.parse" => {
1670+
check_single_col(p)?;
1671+
}
1672+
"xf.cdc.scd1" | "xf.cdc.scd2" | "xf.cdc.compare" => {
1673+
check_list("naturalKey")?;
1674+
check_list("compareColumns")?;
1675+
}
1676+
// Window family: partitionBy + orderBy are upstream columns.
1677+
// `column` is the column the function operates on (lead/lag/
1678+
// first/last) - present on a subset.
1679+
"xf.window"
1680+
| "xf.rownum"
1681+
| "xf.rank"
1682+
| "xf.denserank"
1683+
| "xf.lead"
1684+
| "xf.lag"
1685+
| "xf.first"
1686+
| "xf.last"
1687+
| "xf.ntile"
1688+
| "xf.rank.filter"
1689+
| "xf.cumulative"
1690+
| "xf.aggwin" => {
1691+
check_list("partitionBy")?;
1692+
check_list("orderBy")?;
1693+
check_single_col(p)?;
1694+
}
1695+
// Join keys on the left side. Right-side keys reference the
1696+
// lookup input, whose columns we don't currently propagate
1697+
// through the planner; skip those rather than emit a false
1698+
// positive.
1699+
"xf.join"
1700+
| "xf.join.left"
1701+
| "xf.join.right"
1702+
| "xf.join.full"
1703+
| "xf.join.cross"
1704+
| "xf.semi"
1705+
| "xf.anti" => {
1706+
if let Some(s) = p.get("leftKey").and_then(JsonValue::as_str) {
1707+
for k in s.split(',') {
1708+
let k = k.trim();
1709+
if !k.is_empty() {
1710+
check(k)?;
1711+
}
15871712
}
15881713
}
15891714
}

0 commit comments

Comments
 (0)