Skip to content

Commit b10e182

Browse files
committed
feat: add clgraph CLI with analyze, diff, and mcp commands
Adds a Typer-based CLI so users can interact with lineage from the terminal without writing Python. Three subcommands wrap existing Pipeline methods: - clgraph analyze: parse SQL and display table/column/edge summary - clgraph diff: compare lineage between two pipeline versions - clgraph mcp: start MCP server for LLM integration Supports --format table (Rich), json, and dot output modes.
1 parent a7337b8 commit b10e182

4 files changed

Lines changed: 511 additions & 0 deletions

File tree

pyproject.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ dependencies = [
3232
"sqlglot>=28.0.0",
3333
"graphviz>=0.20.0",
3434
"jinja2>=3.0.0",
35+
"typer>=0.12.0",
36+
"rich>=13.0.0",
3537
]
3638

3739
[project.optional-dependencies]
@@ -91,6 +93,9 @@ Documentation = "https://github.com/mingjerli/clgraph#readme"
9193
Repository = "https://github.com/mingjerli/clgraph"
9294
Issues = "https://github.com/mingjerli/clgraph/issues"
9395

96+
[project.scripts]
97+
clgraph = "clgraph.cli:app"
98+
9499
[tool.hatch.build.targets.wheel]
95100
packages = ["src/clgraph"]
96101

src/clgraph/cli.py

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
"""
2+
clgraph CLI — Column lineage analysis from the terminal.
3+
4+
Usage:
5+
clgraph analyze ./sql/ --dialect bigquery
6+
clgraph analyze query.sql --format json
7+
clgraph analyze query.sql --format dot
8+
clgraph diff old/ new/ --dialect bigquery
9+
clgraph mcp --pipeline ./sql/
10+
"""
11+
12+
import json
13+
from pathlib import Path
14+
15+
import typer
16+
from typing_extensions import Annotated
17+
18+
app = typer.Typer(
19+
name="clgraph",
20+
help="Column lineage and pipeline dependency analysis for SQL.",
21+
no_args_is_help=True,
22+
)
23+
24+
25+
def _load_pipeline(path: Path, dialect: str):
26+
"""Load a Pipeline from a file or directory."""
27+
from clgraph import Pipeline
28+
29+
if path.is_dir():
30+
return Pipeline.from_sql_files(str(path), dialect=dialect)
31+
elif path.suffix == ".sql":
32+
sql = path.read_text()
33+
return Pipeline.from_sql_string(sql, dialect=dialect)
34+
elif path.suffix == ".json":
35+
return Pipeline.from_json_file(str(path))
36+
else:
37+
typer.echo(f"Error: unsupported file type: {path.suffix}", err=True)
38+
raise typer.Exit(code=1)
39+
40+
41+
def _print_table_summary(pipeline):
42+
"""Print a Rich table summary of the pipeline."""
43+
import sys
44+
45+
from rich.console import Console
46+
from rich.table import Table
47+
48+
console = Console(file=sys.stdout)
49+
50+
summary = Table(title="Pipeline Tables")
51+
summary.add_column("Table", style="cyan")
52+
summary.add_column("Type", style="green")
53+
summary.add_column("Columns", justify="right")
54+
summary.add_column("Upstream", justify="right")
55+
summary.add_column("Downstream", justify="right")
56+
57+
for table_name, table_node in pipeline.table_graph.tables.items():
58+
col_count = len(list(pipeline.get_columns_by_table(table_name)))
59+
upstream = pipeline.table_graph.get_dependencies(table_name)
60+
downstream = pipeline.table_graph.get_downstream(table_name)
61+
kind = "source" if table_node.is_source else "derived"
62+
63+
summary.add_row(
64+
table_name,
65+
kind,
66+
str(col_count),
67+
str(len(upstream)),
68+
str(len(downstream)),
69+
)
70+
71+
console.print(summary)
72+
73+
total_tables = len(pipeline.table_graph.tables)
74+
total_cols = len(pipeline.columns)
75+
total_edges = len(pipeline.edges)
76+
console.print(
77+
f"\n[bold]{total_tables}[/bold] tables, "
78+
f"[bold]{total_cols}[/bold] columns, "
79+
f"[bold]{total_edges}[/bold] lineage edges"
80+
)
81+
82+
issues = pipeline.get_all_issues()
83+
if issues:
84+
errors = [i for i in issues if i.severity.value == "error"]
85+
warnings = [i for i in issues if i.severity.value == "warning"]
86+
if errors:
87+
console.print(f"[red]{len(errors)} errors[/red]")
88+
if warnings:
89+
console.print(f"[yellow]{len(warnings)} warnings[/yellow]")
90+
91+
92+
def _print_json_summary(pipeline):
93+
"""Print pipeline summary as JSON."""
94+
tables = []
95+
for table_name, table_node in pipeline.table_graph.tables.items():
96+
columns = [
97+
{
98+
"name": col.column_name,
99+
"type": col.node_type,
100+
"pii": col.pii,
101+
}
102+
for col in pipeline.get_columns_by_table(table_name)
103+
]
104+
tables.append(
105+
{
106+
"name": table_name,
107+
"is_source": table_node.is_source,
108+
"columns": columns,
109+
}
110+
)
111+
112+
output = {
113+
"dialect": pipeline.dialect,
114+
"tables": tables,
115+
"columns": len(pipeline.columns),
116+
"edges": len(pipeline.edges),
117+
"issues": len(pipeline.get_all_issues()),
118+
}
119+
typer.echo(json.dumps(output, indent=2, default=str))
120+
121+
122+
def _print_dot(pipeline):
123+
"""Print pipeline lineage as Graphviz DOT."""
124+
from clgraph.visualizations import visualize_table_dependencies
125+
126+
dot = visualize_table_dependencies(pipeline.table_graph)
127+
typer.echo(dot.source)
128+
129+
130+
@app.command()
131+
def analyze(
132+
path: Annotated[
133+
Path,
134+
typer.Argument(help="Path to SQL file, directory, or JSON pipeline file"),
135+
],
136+
dialect: Annotated[
137+
str,
138+
typer.Option(help="SQL dialect"),
139+
] = "bigquery",
140+
output_format: Annotated[
141+
str,
142+
typer.Option("--format", "-f", help="Output format: table, json, dot"),
143+
] = "table",
144+
):
145+
"""Analyze SQL files and display column lineage summary."""
146+
if not path.exists():
147+
typer.echo(f"Error: path does not exist: {path}", err=True)
148+
raise typer.Exit(code=1)
149+
150+
pipeline = _load_pipeline(path, dialect)
151+
152+
if output_format == "json":
153+
_print_json_summary(pipeline)
154+
elif output_format == "dot":
155+
_print_dot(pipeline)
156+
else:
157+
_print_table_summary(pipeline)
158+
159+
160+
@app.command()
161+
def diff(
162+
old_path: Annotated[
163+
Path,
164+
typer.Argument(help="Path to old SQL file or directory"),
165+
],
166+
new_path: Annotated[
167+
Path,
168+
typer.Argument(help="Path to new SQL file or directory"),
169+
],
170+
dialect: Annotated[
171+
str,
172+
typer.Option(help="SQL dialect"),
173+
] = "bigquery",
174+
output_format: Annotated[
175+
str,
176+
typer.Option("--format", "-f", help="Output format: table, json"),
177+
] = "table",
178+
):
179+
"""Compare lineage between two SQL pipeline versions."""
180+
if not old_path.exists():
181+
typer.echo(f"Error: path does not exist: {old_path}", err=True)
182+
raise typer.Exit(code=1)
183+
if not new_path.exists():
184+
typer.echo(f"Error: path does not exist: {new_path}", err=True)
185+
raise typer.Exit(code=1)
186+
187+
old_pipeline = _load_pipeline(old_path, dialect)
188+
new_pipeline = _load_pipeline(new_path, dialect)
189+
190+
diff_result = new_pipeline.diff(old_pipeline)
191+
192+
if output_format == "json":
193+
output = {
194+
"columns_added": diff_result.columns_added,
195+
"columns_removed": diff_result.columns_removed,
196+
"columns_modified": [
197+
{
198+
"column": cd.full_name,
199+
"field": cd.field_name,
200+
"old_value": cd.old_value,
201+
"new_value": cd.new_value,
202+
}
203+
for cd in diff_result.columns_modified
204+
],
205+
"has_changes": diff_result.has_changes(),
206+
}
207+
typer.echo(json.dumps(output, indent=2, default=str))
208+
else:
209+
_print_diff_summary(diff_result)
210+
211+
212+
def _print_diff_summary(diff_result):
213+
"""Print diff summary as a Rich table."""
214+
import sys
215+
216+
from rich.console import Console
217+
218+
console = Console(file=sys.stdout)
219+
220+
added = diff_result.columns_added
221+
removed = diff_result.columns_removed
222+
modified = diff_result.columns_modified
223+
224+
if not added and not removed and not modified:
225+
console.print("[green]No lineage changes detected.[/green]")
226+
return
227+
228+
if added:
229+
console.print(f"\n[green]+{len(added)} columns added[/green]")
230+
for col in added:
231+
console.print(f" [green]+ {col}[/green]")
232+
233+
if removed:
234+
console.print(f"\n[red]-{len(removed)} columns removed[/red]")
235+
for col in removed:
236+
console.print(f" [red]- {col}[/red]")
237+
238+
if modified:
239+
console.print(f"\n[yellow]~{len(modified)} columns modified[/yellow]")
240+
for cd in modified:
241+
console.print(f" [yellow]~ {cd.full_name} ({cd.field_name})[/yellow]")
242+
243+
244+
@app.command()
245+
def mcp(
246+
pipeline: Annotated[
247+
Path,
248+
typer.Option("--pipeline", "-p", help="Path to SQL files directory or JSON pipeline file"),
249+
],
250+
dialect: Annotated[
251+
str,
252+
typer.Option(help="SQL dialect"),
253+
] = "bigquery",
254+
transport: Annotated[
255+
str,
256+
typer.Option(help="Transport type: stdio, http"),
257+
] = "stdio",
258+
no_llm_tools: Annotated[
259+
bool,
260+
typer.Option("--no-llm-tools", help="Exclude LLM-dependent tools"),
261+
] = False,
262+
):
263+
"""Start MCP server for LLM integration (Claude Desktop, etc.).
264+
265+
Requires the mcp extra: pip install clgraph[mcp]
266+
"""
267+
if not pipeline.exists():
268+
typer.echo(f"Error: path does not exist: {pipeline}", err=True)
269+
raise typer.Exit(code=1)
270+
271+
loaded = _load_pipeline(pipeline, dialect)
272+
273+
try:
274+
from clgraph.mcp import run_mcp_server
275+
except ImportError as err:
276+
typer.echo(
277+
"Error: MCP dependencies not installed. Install with: pip install clgraph[mcp]",
278+
err=True,
279+
)
280+
raise typer.Exit(code=1) from err
281+
282+
run_mcp_server(
283+
loaded,
284+
llm=None,
285+
include_llm_tools=not no_llm_tools,
286+
transport=transport,
287+
)
288+
289+
290+
if __name__ == "__main__":
291+
app()

0 commit comments

Comments
 (0)