Skip to content

Commit 0105501

Browse files
mingjerliclaude
andcommitted
fix: Resolve unqualified columns in JOIN queries using schema info
When a SQL query joins multiple tables and columns are unqualified (no table prefix), the lineage builder previously defaulted to the first table, which was often incorrect. For example, in: SELECT DATE_TRUNC(order_date, MONTH) as month FROM analytics.user_metrics JOIN staging.user_orders USING (user_id) The `order_date` column would be incorrectly attributed to `user_metrics` instead of `user_orders`, causing lineage edges to be dropped. This fix: 1. Uses sqlglot's qualify_columns optimizer with upstream table schemas to add correct table prefixes before building lineage 2. Fixes _extract_select_from_query to use dialect when serializing SQL, which was causing DATE_TRUNC arguments to be reordered incorrectly The 3-layer pipeline example now correctly shows: - staging.user_orders.order_date -> reports.monthly_revenue.month - trace_column_backward returns raw.orders.order_date Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1 parent b88fb0d commit 0105501

3 files changed

Lines changed: 552 additions & 6 deletions

File tree

src/clgraph/lineage_builder.py

Lines changed: 107 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
import sqlglot
1111
from sqlglot import exp
12+
from sqlglot.optimizer import qualify_columns
1213

1314
from .metadata_parser import MetadataExtractor
1415
from .models import (
@@ -463,6 +464,106 @@ def build_parent_map(node: exp.Expression, parent: Optional[exp.Expression] = No
463464
return outermost_nested
464465

465466

467+
def _convert_to_nested_schema(
468+
flat_schema: Dict[str, List[str]],
469+
) -> Dict[str, Dict[str, Dict[str, str]]]:
470+
"""
471+
Convert flat table schema to nested format for sqlglot optimizer.
472+
473+
The sqlglot optimizer.qualify_columns requires a nested schema format:
474+
{
475+
"schema_name": {
476+
"table_name": {
477+
"column_name": "type"
478+
}
479+
}
480+
}
481+
482+
Our flat format is:
483+
{
484+
"schema.table": ["col1", "col2", ...]
485+
}
486+
487+
Args:
488+
flat_schema: Dict mapping "schema.table" to list of column names
489+
490+
Returns:
491+
Nested schema dict suitable for sqlglot optimizer
492+
"""
493+
nested: Dict[str, Dict[str, Dict[str, str]]] = {}
494+
495+
for qualified_table, columns in flat_schema.items():
496+
parts = qualified_table.split(".")
497+
498+
if len(parts) >= 2:
499+
# Has schema prefix: "schema.table" or "catalog.schema.table"
500+
schema_name = parts[-2] # Second to last part
501+
table_name = parts[-1] # Last part
502+
else:
503+
# No schema prefix - use empty string as schema
504+
schema_name = ""
505+
table_name = qualified_table
506+
507+
if schema_name not in nested:
508+
nested[schema_name] = {}
509+
510+
if table_name not in nested[schema_name]:
511+
nested[schema_name][table_name] = {}
512+
513+
for col in columns:
514+
# Use "UNKNOWN" as type since we don't have type info
515+
nested[schema_name][table_name][col] = "UNKNOWN"
516+
517+
return nested
518+
519+
520+
def _qualify_sql_with_schema(
521+
sql_query: str,
522+
external_table_columns: Dict[str, List[str]],
523+
dialect: str,
524+
) -> str:
525+
"""
526+
Qualify unqualified column references in SQL using schema information.
527+
528+
When a SQL query has multiple tables joined and columns are unqualified
529+
(no table prefix), this function uses the schema to determine which table
530+
each column belongs to and adds the appropriate table prefix.
531+
532+
Args:
533+
sql_query: The SQL query to qualify
534+
external_table_columns: Dict mapping table names to column lists
535+
dialect: SQL dialect for parsing
536+
537+
Returns:
538+
The SQL query with qualified column references
539+
"""
540+
if not external_table_columns:
541+
return sql_query
542+
543+
try:
544+
# Parse the SQL
545+
parsed = sqlglot.parse_one(sql_query, read=dialect)
546+
547+
# Convert to nested schema format
548+
nested_schema = _convert_to_nested_schema(external_table_columns)
549+
550+
# Use sqlglot's qualify_columns to add table prefixes
551+
qualified = qualify_columns.qualify_columns(
552+
parsed,
553+
schema=nested_schema,
554+
dialect=dialect,
555+
infer_schema=True,
556+
)
557+
558+
# Return the qualified SQL
559+
return qualified.sql(dialect=dialect)
560+
561+
except Exception:
562+
# If qualification fails, return original SQL
563+
# The lineage builder will handle unqualified columns as before
564+
return sql_query
565+
566+
466567
# ============================================================================
467568
# Part 1: Recursive Lineage Builder
468569
# ============================================================================
@@ -485,8 +586,12 @@ def __init__(
485586
self.dialect = dialect
486587
self.query_id = query_id
487588

488-
# Parse query structure first
489-
parser = RecursiveQueryParser(sql_query, dialect=dialect)
589+
# Qualify unqualified columns using schema info before parsing
590+
# This ensures columns like "order_date" in a JOIN get the correct table prefix
591+
qualified_sql = _qualify_sql_with_schema(sql_query, self.external_table_columns, dialect)
592+
593+
# Parse query structure using qualified SQL
594+
parser = RecursiveQueryParser(qualified_sql, dialect=dialect)
490595
self.unit_graph = parser.parse()
491596

492597
# Column lineage graph (to be built)

src/clgraph/pipeline.py

Lines changed: 76 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ def build(self, pipeline_or_graph) -> "Pipeline":
7777
# Step 2a: Run single-query lineage
7878
try:
7979
# Extract SELECT statement from DDL/DML if needed
80-
sql_for_lineage = self._extract_select_from_query(query)
80+
sql_for_lineage = self._extract_select_from_query(query, pipeline.dialect)
8181

8282
if sql_for_lineage:
8383
# Collect upstream table schemas from already-processed queries
@@ -708,23 +708,34 @@ def _is_physical_table_column(
708708

709709
return False
710710

711-
def _extract_select_from_query(self, query: ParsedQuery) -> Optional[str]:
711+
def _extract_select_from_query(
712+
self, query: ParsedQuery, dialect: str = "bigquery"
713+
) -> Optional[str]:
712714
"""
713715
Extract SELECT statement from DDL/DML queries.
714716
Single-query lineage only works on SELECT statements, so we need to extract
715717
the SELECT from CREATE TABLE AS SELECT, INSERT INTO ... SELECT, etc.
718+
719+
Args:
720+
query: The parsed query to extract SELECT from
721+
dialect: SQL dialect for proper SQL serialization (important for functions
722+
like DATE_TRUNC which have different argument orders in different dialects)
723+
724+
Returns:
725+
The SELECT SQL string, or None if no SELECT found
716726
"""
717727
ast = query.ast
718728

719729
# CREATE TABLE/VIEW AS SELECT
720730
if isinstance(ast, exp.Create):
721731
if ast.expression and isinstance(ast.expression, exp.Select):
722-
return ast.expression.sql()
732+
# Use dialect to ensure proper SQL serialization
733+
return ast.expression.sql(dialect=dialect)
723734

724735
# INSERT INTO ... SELECT
725736
elif isinstance(ast, exp.Insert):
726737
if ast.expression and isinstance(ast.expression, exp.Select):
727-
return ast.expression.sql()
738+
return ast.expression.sql(dialect=dialect)
728739

729740
# MERGE INTO statement - pass full SQL to lineage builder
730741
elif isinstance(ast, exp.Merge):
@@ -2595,6 +2606,67 @@ def to_kestra_flow(
25952606
**kwargs,
25962607
)
25972608

2609+
# ========================================================================
2610+
# Orchestrator Methods - Mage
2611+
# ========================================================================
2612+
2613+
def to_mage_pipeline(
2614+
self,
2615+
executor: Callable[[str], None],
2616+
pipeline_name: str,
2617+
description: Optional[str] = None,
2618+
connection_name: str = "clickhouse_default",
2619+
) -> Dict[str, Any]:
2620+
"""
2621+
Generate Mage pipeline files from this pipeline.
2622+
2623+
Mage is a modern data pipeline tool with a notebook-style UI and
2624+
block-based architecture. Each SQL query becomes a block (either
2625+
data_loader or transformer).
2626+
2627+
Args:
2628+
executor: Function that executes SQL (for code reference)
2629+
pipeline_name: Name for the Mage pipeline
2630+
description: Optional pipeline description (auto-generated if not provided)
2631+
connection_name: Database connection name in Mage io_config.yaml
2632+
2633+
Returns:
2634+
Dictionary with pipeline file structure:
2635+
{
2636+
"metadata.yaml": <dict>,
2637+
"blocks": {"block_name": <code>, ...}
2638+
}
2639+
2640+
Examples:
2641+
# Generate Mage pipeline files
2642+
files = pipeline.to_mage_pipeline(
2643+
executor=execute_sql,
2644+
pipeline_name="enterprise_pipeline",
2645+
)
2646+
2647+
# Write files to Mage project
2648+
import yaml
2649+
with open("pipelines/enterprise_pipeline/metadata.yaml", "w") as f:
2650+
yaml.dump(files["metadata.yaml"], f)
2651+
for name, code in files["blocks"].items():
2652+
with open(f"pipelines/enterprise_pipeline/{name}.py", "w") as f:
2653+
f.write(code)
2654+
2655+
Note:
2656+
- First query (no dependencies) becomes data_loader block
2657+
- Subsequent queries become transformer blocks
2658+
- Dependencies are managed via upstream_blocks/downstream_blocks
2659+
- Requires mage-ai package and ClickHouse connection in io_config.yaml
2660+
"""
2661+
from .orchestrators import MageOrchestrator
2662+
2663+
return MageOrchestrator(self).to_pipeline_files(
2664+
executor=executor,
2665+
pipeline_name=pipeline_name,
2666+
description=description,
2667+
connection_name=connection_name,
2668+
)
2669+
25982670
# ========================================================================
25992671
# Validation Methods
26002672
# ========================================================================

0 commit comments

Comments
 (0)