Skip to content

Commit af3a079

Browse files
mingjerliclaude
andcommitted
feat: Improve star (*) handling in cross-query lineage with EXCEPT/REPLACE support
- Enhanced _add_cross_query_edges to properly handle SELECT * in pipelines - Added support for SELECT * EXCEPT to exclude columns from cross-query edges - Added support for SELECT * REPLACE to maintain lineage for transformed columns - Fixed issue where upstream columns weren't connected to * in downstream queries - Copy is_star, except_columns, and replace_columns fields to pipeline nodes Tests: - Added test_star_expansion_in_cross_query_edges - Added test_star_except_in_cross_query_edges - Added test_star_replace_in_cross_query_edges - All 44 tests in test_multi_query.py pass This ensures that: - Star notation is properly expanded in cross-query scenarios - SELECT * EXCEPT correctly excludes specified columns from lineage - SELECT * REPLACE maintains proper lineage for transformed columns - Backward tracing correctly follows through all pipeline layers 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
1 parent 01750ef commit af3a079

2 files changed

Lines changed: 220 additions & 1 deletion

File tree

src/clgraph/pipeline.py

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,10 @@ def _add_query_columns(
149149
owner=owner,
150150
tags=tags,
151151
custom_metadata=custom_metadata,
152+
# Star expansion fields
153+
is_star=node.is_star,
154+
except_columns=node.except_columns,
155+
replace_columns=node.replace_columns,
152156
)
153157
pipeline.add_column(column)
154158

@@ -187,6 +191,7 @@ def _add_cross_query_edges(self, pipeline: "Pipeline"):
187191
- For each column C in Q1's output:
188192
- For each query Qi that reads T:
189193
- If Qi references T.C, create edge: Q1.C -> Qi.C
194+
- If Qi references T.*, create edges: Q1.C -> Qi.* for ALL columns
190195
"""
191196
for table_name, table_node in pipeline.table_graph.tables.items():
192197
# Find query that creates this table
@@ -204,7 +209,56 @@ def _add_cross_query_edges(self, pipeline: "Pipeline"):
204209

205210
# Find queries that read this table
206211
for reading_query_id in table_node.read_by:
207-
# Match columns by name
212+
# Check if reading query has a * column for this table (input layer)
213+
input_star_column = None
214+
for col in pipeline.columns.values():
215+
if (
216+
col.query_id == reading_query_id
217+
and col.table_name == table_name
218+
and col.column_name == "*"
219+
and col.layer == "input"
220+
):
221+
input_star_column = col
222+
break
223+
224+
# Also check if there's an output * in the same query (has EXCEPT/REPLACE)
225+
# This is for queries like: SELECT * EXCEPT (...) FROM table
226+
output_star_column = None
227+
if input_star_column:
228+
for col in pipeline.columns.values():
229+
if (
230+
col.query_id == reading_query_id
231+
and col.column_name == "*"
232+
and col.layer == "output"
233+
):
234+
output_star_column = col
235+
break
236+
237+
# Use output * for EXCEPT/REPLACE info, but connect to input *
238+
star_column = input_star_column
239+
except_columns = output_star_column.except_columns if output_star_column else set()
240+
241+
# If there's a star column, connect all output columns to it
242+
# BUT respect EXCEPT clause - skip columns that are excepted
243+
if star_column:
244+
for output_col in output_columns:
245+
# Skip columns in EXCEPT clause
246+
if output_col.column_name in except_columns:
247+
continue
248+
249+
edge = ColumnEdge(
250+
from_node=output_col,
251+
to_node=star_column,
252+
edge_type="cross_query",
253+
context="cross_query",
254+
transformation=f"{creating_query_id} -> {reading_query_id}",
255+
query_id=None, # Cross-query edge
256+
)
257+
pipeline.add_edge(edge)
258+
259+
# ALWAYS match columns by name (not just when there's no star)
260+
# This handles cases where the query uses both * (for COUNT(*))
261+
# and specific columns (for SUM(amount), etc.)
208262
for output_col in output_columns:
209263
# Find corresponding input column in reading query
210264
# Search for this column in reading query's lineage

tests/test_multi_query.py

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -909,6 +909,171 @@ def test_get_lineage_path(self):
909909
# Should have edges connecting them
910910
assert len(path) > 0
911911

912+
def test_star_expansion_in_cross_query_edges(self):
913+
"""
914+
Test that * is properly handled in cross-query lineage.
915+
916+
When upstream query creates a table with known columns, and downstream
917+
query uses both * (for COUNT(*)) and specific columns (for SUM(amount)),
918+
we should create edges for both:
919+
1. All upstream columns -> * node (for COUNT(*))
920+
2. Specific column matches (for individual column references)
921+
"""
922+
queries = [
923+
"""
924+
CREATE TABLE staging.user_orders AS
925+
SELECT
926+
user_id,
927+
order_id,
928+
amount,
929+
order_date
930+
FROM raw.orders
931+
WHERE status = 'completed'
932+
""",
933+
"""
934+
CREATE TABLE analytics.user_metrics AS
935+
SELECT
936+
user_id,
937+
COUNT(*) as order_count,
938+
SUM(amount) as total_revenue,
939+
AVG(amount) as avg_order_value
940+
FROM staging.user_orders
941+
GROUP BY user_id
942+
""",
943+
]
944+
945+
parser = MultiQueryParser()
946+
table_graph = parser.parse_queries(queries)
947+
948+
builder = PipelineLineageBuilder()
949+
pipeline = builder.build(table_graph)
950+
951+
# Verify that cross-query edges include connections to * column
952+
cross_query_edges = [e for e in pipeline.edges if e.query_id is None]
953+
954+
# Should have edges from all staging.user_orders columns to the * node
955+
star_edges = [
956+
e
957+
for e in cross_query_edges
958+
if e.to_node.column_name == "*" and e.to_node.table_name == "staging.user_orders"
959+
]
960+
# All 4 columns from query_0 should connect to the * in query_1
961+
assert len(star_edges) == 4
962+
963+
# Should ALSO have edges for specifically referenced columns (user_id, amount)
964+
specific_edges = [
965+
e
966+
for e in cross_query_edges
967+
if e.to_node.column_name in ("user_id", "amount")
968+
and e.to_node.table_name == "staging.user_orders"
969+
and e.to_node.layer == "input"
970+
]
971+
# user_id and amount should each have an edge
972+
assert len(specific_edges) >= 2
973+
974+
# Verify backward lineage traces all the way to source
975+
sources = pipeline.trace_column_backward("analytics.user_metrics", "total_revenue")
976+
assert any(s.table_name == "raw.orders" and s.column_name == "amount" for s in sources)
977+
978+
def test_star_except_in_cross_query_edges(self):
979+
"""
980+
Test that SELECT * EXCEPT properly excludes columns in cross-query lineage.
981+
982+
When downstream query uses SELECT * EXCEPT (col1, col2), we should NOT
983+
create cross-query edges for the excepted columns.
984+
"""
985+
queries = [
986+
"""
987+
CREATE TABLE staging.orders AS
988+
SELECT
989+
order_id,
990+
user_id,
991+
amount,
992+
sensitive_data,
993+
order_date
994+
FROM raw.orders
995+
""",
996+
"""
997+
CREATE TABLE analytics.clean_orders AS
998+
SELECT * EXCEPT (sensitive_data)
999+
FROM staging.orders
1000+
""",
1001+
]
1002+
1003+
parser = MultiQueryParser()
1004+
table_graph = parser.parse_queries(queries)
1005+
1006+
builder = PipelineLineageBuilder()
1007+
pipeline = builder.build(table_graph)
1008+
1009+
# Verify that cross-query edges exclude sensitive_data
1010+
cross_query_edges = [e for e in pipeline.edges if e.query_id is None]
1011+
1012+
# Get edges from staging.orders to the * in analytics
1013+
star_edges = [
1014+
e
1015+
for e in cross_query_edges
1016+
if e.to_node.column_name == "*" and e.to_node.table_name == "staging.orders"
1017+
]
1018+
1019+
# Should have edges for order_id, user_id, amount, order_date
1020+
# Should NOT have edge for sensitive_data
1021+
edge_from_columns = {e.from_node.column_name for e in star_edges}
1022+
assert "order_id" in edge_from_columns
1023+
assert "user_id" in edge_from_columns
1024+
assert "amount" in edge_from_columns
1025+
assert "order_date" in edge_from_columns
1026+
assert "sensitive_data" not in edge_from_columns # This should be excluded!
1027+
1028+
def test_star_replace_in_cross_query_edges(self):
1029+
"""
1030+
Test that SELECT * REPLACE maintains lineage for replaced columns.
1031+
1032+
REPLACE doesn't remove columns, it transforms them. Cross-query edges
1033+
should still exist for replaced columns.
1034+
"""
1035+
queries = [
1036+
"""
1037+
CREATE TABLE staging.orders AS
1038+
SELECT
1039+
order_id,
1040+
user_id,
1041+
amount,
1042+
status,
1043+
order_date
1044+
FROM raw.orders
1045+
""",
1046+
"""
1047+
CREATE TABLE analytics.orders_normalized AS
1048+
SELECT * REPLACE (UPPER(status) as status)
1049+
FROM staging.orders
1050+
""",
1051+
]
1052+
1053+
parser = MultiQueryParser()
1054+
table_graph = parser.parse_queries(queries)
1055+
1056+
builder = PipelineLineageBuilder()
1057+
pipeline = builder.build(table_graph)
1058+
1059+
# Verify that cross-query edges include ALL columns (including status)
1060+
cross_query_edges = [e for e in pipeline.edges if e.query_id is None]
1061+
1062+
# Get edges from staging.orders to the * in analytics
1063+
star_edges = [
1064+
e
1065+
for e in cross_query_edges
1066+
if e.to_node.column_name == "*" and e.to_node.table_name == "staging.orders"
1067+
]
1068+
1069+
# Should have edges for ALL columns including the replaced one
1070+
edge_from_columns = {e.from_node.column_name for e in star_edges}
1071+
assert "order_id" in edge_from_columns
1072+
assert "user_id" in edge_from_columns
1073+
assert "amount" in edge_from_columns
1074+
assert "status" in edge_from_columns # Replaced column should still have edge
1075+
assert "order_date" in edge_from_columns
1076+
9121077

9131078
# ============================================================================
9141079
# Part 4: Edge Cases Tests

0 commit comments

Comments
 (0)