1+ {
2+ "cells" : [
3+ {
4+ "cell_type" : " markdown" ,
5+ "id" : " a1b2c3d4" ,
6+ "metadata" : {},
7+ "source" : [
8+ " # JOIN Predicate Column Lineage\n " ,
9+ " \n " ,
10+ " **Example: Tracking JOIN ON Predicate Columns in Column Lineage (Gap 7)**\n " ,
11+ " \n " ,
12+ " \n " ,
13+ " This example demonstrates how clgraph tracks JOIN ON predicate columns as\n " ,
14+ " lineage edges. Before Gap 7, only value-flow columns (columns in SELECT)\n " ,
15+ " appeared in the lineage graph. Now, columns used in JOIN ON clauses are\n " ,
16+ " tracked as predicate edges, making previously invisible dependencies\n " ,
17+ " visible for impact analysis.\n " ,
18+ " \n " ,
19+ " Key features demonstrated:\n " ,
20+ " 1. Basic equi-join predicate edges with metadata\n " ,
21+ " 2. Point-in-time / range join (BETWEEN) with 5 predicate columns\n " ,
22+ " 3. Multi-join chain with per-join scoped predicate edges\n " ,
23+ " 4. Impact analysis using predicate edges with SQLColumnTracer"
24+ ]
25+ },
26+ {
27+ "cell_type" : " markdown" ,
28+ "id" : " b2c3d4e5" ,
29+ "metadata" : {},
30+ "source" : [
31+ " ### Imports"
32+ ]
33+ },
34+ {
35+ "cell_type" : " code" ,
36+ "execution_count" : null ,
37+ "id" : " c3d4e5f6" ,
38+ "metadata" : {},
39+ "outputs" : [],
40+ "source" : [
41+ " from clgraph import Pipeline, RecursiveLineageBuilder, SQLColumnTracer\n " ,
42+ " \n " ,
43+ " \n " ,
44+ " def predicate_edges(graph):\n " ,
45+ " \"\"\" Return only edges where is_join_predicate is True.\"\"\"\n " ,
46+ " return [e for e in graph.edges if e.is_join_predicate]\n " ,
47+ " \n " ,
48+ " \n " ,
49+ " def predicate_edges_to(graph, target):\n " ,
50+ " \"\"\" Return predicate edges targeting a specific output column.\"\"\"\n " ,
51+ " return [e for e in graph.edges if e.is_join_predicate and e.to_node.full_name == target]\n " ,
52+ " \n " ,
53+ " \n " ,
54+ " # ============================================================\n " ,
55+ " # Example 1: Basic Equi-Join Predicate Edges\n " ,
56+ " # ============================================================\n " ,
57+ " print(\" =\" * 60)\n " ,
58+ " print(\" Example 1: Basic Equi-Join Predicate Edges\" )\n " ,
59+ " print(\" =\" * 60)\n " ,
60+ " \n " ,
61+ " sql_1 = \"\"\"\n " ,
62+ " SELECT o.order_id, o.amount, d.city AS customer_city\n " ,
63+ " FROM raw_orders o\n " ,
64+ " LEFT JOIN dim_customer d ON o.customer_id = d.id\n " ,
65+ " \"\"\"\n " ,
66+ " \n " ,
67+ " builder_1 = RecursiveLineageBuilder(sql_1, dialect=\" bigquery\" )\n " ,
68+ " graph_1 = builder_1.build()\n " ,
69+ " \n " ,
70+ " print(f\"\\ nQuery:{sql_1}\" )\n " ,
71+ " print(\" 1a. Value edges (standard lineage):\" )\n " ,
72+ " for edge in graph_1.edges:\n " ,
73+ " if not edge.is_join_predicate:\n " ,
74+ " print(f\" {edge.from_node.full_name} -> {edge.to_node.full_name}\" )\n " ,
75+ " \n " ,
76+ " print(\"\\ n1b. Predicate edges (NEW \\ u2014 from JOIN ON clause):\" )\n " ,
77+ " for edge in predicate_edges(graph_1):\n " ,
78+ " print(f\" {edge.from_node.full_name} -> {edge.to_node.full_name}\" )\n " ,
79+ " print(f\" \\ u2022 edge_type = {edge.edge_type}\" )\n " ,
80+ " print(f\" \\ u2022 join_side = {edge.join_side}\" )\n " ,
81+ " print(f\" \\ u2022 join_condition = {edge.join_condition}\" )\n " ,
82+ " \n " ,
83+ " print(\"\\ n1c. Compare: d.city \\ u2192 output.customer_city is a VALUE edge:\" )\n " ,
84+ " for edge in graph_1.edges:\n " ,
85+ " if edge.from_node.full_name == \" dim_customer.city\" :\n " ,
86+ " print(f\" is_join_predicate = {edge.is_join_predicate} \\ u2713 (value flow, not predicate)\" )\n " ,
87+ " \n " ,
88+ " \n " ,
89+ " # ============================================================\n " ,
90+ " # Example 2: Point-in-Time / Range Join (BETWEEN)\n " ,
91+ " # ============================================================\n " ,
92+ " print(\"\\ n\" + \" =\" * 60)\n " ,
93+ " print(\" Example 2: Point-in-Time Join (BETWEEN)\" )\n " ,
94+ " print(\" =\" * 60)\n " ,
95+ " \n " ,
96+ " sql_2 = \"\"\"\n " ,
97+ " SELECT o.order_id, o.customer_id, o.order_ts, o.amount,\n " ,
98+ " d.city AS customer_city_at_order\n " ,
99+ " FROM raw_orders o\n " ,
100+ " LEFT JOIN dim_customer d\n " ,
101+ " ON o.customer_id = d.id\n " ,
102+ " AND o.order_ts BETWEEN d.start_time AND d.end_time\n " ,
103+ " \"\"\"\n " ,
104+ " \n " ,
105+ " builder_2 = RecursiveLineageBuilder(sql_2, dialect=\" bigquery\" )\n " ,
106+ " graph_2 = builder_2.build()\n " ,
107+ " \n " ,
108+ " print(f\"\\ nQuery:{sql_2}\" )\n " ,
109+ " print(\" 2a. All predicate edges \\ u2192 customer_city_at_order:\" )\n " ,
110+ " pred_edges_2 = predicate_edges_to(graph_2, \" output.customer_city_at_order\" )\n " ,
111+ " for edge in pred_edges_2:\n " ,
112+ " print(f\" {edge.from_node.full_name:30s} (join_side={edge.join_side})\" )\n " ,
113+ " \n " ,
114+ " print(f\"\\ n \\ u2192 {len(pred_edges_2)} predicate columns detected\" )\n " ,
115+ " print(\" \\ u2192 d.start_time and d.end_time were previously INVISIBLE in lineage\" )\n " ,
116+ " \n " ,
117+ " print(\"\\ n2b. Value edge is unchanged:\" )\n " ,
118+ " for edge in graph_2.edges:\n " ,
119+ " if edge.from_node.full_name == \" dim_customer.city\" and not edge.is_join_predicate:\n " ,
120+ " print(f\" {edge.from_node.full_name} -> {edge.to_node.full_name} (value edge \\ u2713)\" )\n " ,
121+ " \n " ,
122+ " \n " ,
123+ " # ============================================================\n " ,
124+ " # Example 3: Multi-Join Chain with Per-Join Scoping\n " ,
125+ " # ============================================================\n " ,
126+ " print(\"\\ n\" + \" =\" * 60)\n " ,
127+ " print(\" Example 3: Multi-Join Chain \\ u2014 Per-Join Scoping\" )\n " ,
128+ " print(\" =\" * 60)\n " ,
129+ " \n " ,
130+ " sql_3 = \"\"\"\n " ,
131+ " SELECT a.id, b.val, c.label\n " ,
132+ " FROM table_a a\n " ,
133+ " INNER JOIN table_b b ON a.id = b.a_id\n " ,
134+ " INNER JOIN table_c c ON b.id = c.b_id AND b.category = c.category\n " ,
135+ " \"\"\"\n " ,
136+ " \n " ,
137+ " builder_3 = RecursiveLineageBuilder(sql_3, dialect=\" bigquery\" )\n " ,
138+ " graph_3 = builder_3.build()\n " ,
139+ " \n " ,
140+ " print(f\"\\ nQuery:{sql_3}\" )\n " ,
141+ " print(\" 3a. First join predicates (a.id = b.a_id) \\ u2192 output.val ONLY:\" )\n " ,
142+ " for edge in predicate_edges_to(graph_3, \" output.val\" ):\n " ,
143+ " print(f\" {edge.from_node.full_name} -> output.val\" )\n " ,
144+ " \n " ,
145+ " print(\n " ,
146+ " \"\\ n3b. Second join predicates (b.id = c.b_id AND b.category = c.category) \\ u2192 output.label ONLY:\"\n " ,
147+ " )\n " ,
148+ " for edge in predicate_edges_to(graph_3, \" output.label\" ):\n " ,
149+ " print(f\" {edge.from_node.full_name} -> output.label\" )\n " ,
150+ " \n " ,
151+ " print(\"\\ n3c. No cross-join leakage \\ u2014 first join predicates do NOT target output.label:\" )\n " ,
152+ " label_pred_sources = {e.from_node.full_name for e in predicate_edges_to(graph_3, \" output.label\" )}\n " ,
153+ " assert \" table_a.id\" not in label_pred_sources, \" Cross-join leakage detected!\"\n " ,
154+ " assert \" table_b.a_id\" not in label_pred_sources, \" Cross-join leakage detected!\"\n " ,
155+ " print(\" \\ u2713 table_a.id NOT in output.label predicates\" )\n " ,
156+ " print(\" \\ u2713 table_b.a_id NOT in output.label predicates\" )\n " ,
157+ " \n " ,
158+ " \n " ,
159+ " # ============================================================\n " ,
160+ " # Example 4: Impact Analysis with Predicate Edges\n " ,
161+ " # ============================================================\n " ,
162+ " print(\"\\ n\" + \" =\" * 60)\n " ,
163+ " print(\" Example 4: Impact Analysis with Predicate Edges\" )\n " ,
164+ " print(\" =\" * 60)\n " ,
165+ " \n " ,
166+ " print(\"\\ nUsing the point-in-time join from Example 2.\" )\n " ,
167+ " \n " ,
168+ " print(\"\\ n4a. Forward trace from dim_customer.start_time (SQLColumnTracer):\" )\n " ,
169+ " tracer = SQLColumnTracer(sql_2, dialect=\" bigquery\" )\n " ,
170+ " forward = tracer.get_forward_lineage([\" dim_customer.start_time\" ])\n " ,
171+ " print(f\" Impacted outputs: {forward['impacted_outputs']}\" )\n " ,
172+ " print(\" \\ u2192 customer_city_at_order is now reachable (was invisible before Gap 7)\" )\n " ,
173+ " \n " ,
174+ " print(\"\\ n4b. Filter predicate vs value edges using is_join_predicate:\" )\n " ,
175+ " value_edges_2 = [e for e in graph_2.edges if not e.is_join_predicate]\n " ,
176+ " pred_edges_all_2 = [e for e in graph_2.edges if e.is_join_predicate]\n " ,
177+ " print(f\" Value edges: {len(value_edges_2)}\" )\n " ,
178+ " print(f\" Predicate edges: {len(pred_edges_all_2)}\" )\n " ,
179+ " print(f\" Total edges: {len(graph_2.edges)}\" )\n " ,
180+ " \n " ,
181+ " print(\"\\ n4c. Existing value lineage is unchanged:\" )\n " ,
182+ " for edge in value_edges_2:\n " ,
183+ " print(f\" {edge.from_node.full_name} -> {edge.to_node.full_name}\" )\n " ,
184+ " \n " ,
185+ " print(\"\\ n\" + \" =\" * 60)\n " ,
186+ " print(\" JOIN Predicate Lineage Examples Complete!\" )\n " ,
187+ " print(\" =\" * 60)"
188+ ]
189+ },
190+ {
191+ "cell_type" : " markdown" ,
192+ "id" : " d4e5f6a7" ,
193+ "metadata" : {},
194+ "source" : [
195+ " ### Visualize Point-in-Time Join Lineage\n " ,
196+ " \n " ,
197+ " Display the column lineage graph for the point-in-time join (Example 2),\n " ,
198+ " showing both value and predicate edges."
199+ ]
200+ },
201+ {
202+ "cell_type" : " code" ,
203+ "execution_count" : null ,
204+ "id" : " e5f6a7b8" ,
205+ "metadata" : {},
206+ "outputs" : [],
207+ "source" : [
208+ " import shutil\n " ,
209+ " \n " ,
210+ " from clgraph import visualize_pipeline_lineage\n " ,
211+ " \n " ,
212+ " # Create pipeline for visualization using the point-in-time join\n " ,
213+ " sql_pit = \"\"\"\n " ,
214+ " SELECT o.order_id, o.customer_id, o.order_ts, o.amount,\n " ,
215+ " d.city AS customer_city_at_order\n " ,
216+ " FROM raw_orders o\n " ,
217+ " LEFT JOIN dim_customer d\n " ,
218+ " ON o.customer_id = d.id\n " ,
219+ " AND o.order_ts BETWEEN d.start_time AND d.end_time\n " ,
220+ " \"\"\"\n " ,
221+ " pit_pipeline = Pipeline([(\" pit_join\" , sql_pit)], dialect=\" bigquery\" )\n " ,
222+ " \n " ,
223+ " if shutil.which(\" dot\" ) is None:\n " ,
224+ " print(\"\\ u26a0\\ ufe0f Graphviz not installed. Install with: brew install graphviz\" )\n " ,
225+ " else:\n " ,
226+ " print(\" Point-in-Time Join \\ u2014 Column Lineage (value + predicate edges):\" )\n " ,
227+ " display(visualize_pipeline_lineage(pit_pipeline.column_graph.to_simplified()))"
228+ ]
229+ }
230+ ],
231+ "metadata" : {
232+ "kernelspec" : {
233+ "display_name" : " Python 3 (ipykernel)" ,
234+ "language" : " python" ,
235+ "name" : " python3"
236+ },
237+ "language_info" : {
238+ "name" : " python" ,
239+ "version" : " 3.12.0"
240+ }
241+ },
242+ "nbformat" : 4 ,
243+ "nbformat_minor" : 5
244+ }
0 commit comments