Skip to content

Commit 4f9cc43

Browse files
Vignesh Narayanaswamyclaude
authored andcommitted
test: add prefect_connector tests, fix import fallbacks
9 tests covering discovery, dedup, tag filtering, schedule extraction, source_updated_at, and empty results. Also fix DeploymentFilter/Tags fallbacks for environments without prefect installed. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 446ed80 commit 4f9cc43

File tree

2 files changed

+120
-0
lines changed

2 files changed

+120
-0
lines changed

src/model_ledger/connectors/prefect.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@
1616
from prefect.client.schemas.filters import DeploymentFilter, DeploymentFilterTags
1717
except ImportError: # pragma: no cover
1818
get_client = None # type: ignore[assignment]
19+
DeploymentFilter = None # type: ignore[assignment,misc]
20+
DeploymentFilterTags = None # type: ignore[assignment,misc]
1921

2022
from model_ledger.graph.models import DataNode
2123

Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
"""Tests for prefect_connector factory."""
2+
from unittest.mock import AsyncMock, MagicMock, patch
3+
from model_ledger.connectors.prefect import prefect_connector
4+
from model_ledger.graph.protocol import SourceConnector
5+
6+
7+
def _make_deployment(name, tags=None, schedules=None, updated=None, entrypoint=None, version=None):
8+
dep = MagicMock()
9+
dep.name = name
10+
dep.tags = tags or []
11+
dep.schedules = schedules or []
12+
dep.updated = updated
13+
dep.entrypoint = entrypoint
14+
dep.version = version
15+
return dep
16+
17+
18+
def _make_schedule(cron="0 4 * * *"):
19+
sched_obj = MagicMock()
20+
sched_obj.schedule.cron = cron
21+
return sched_obj
22+
23+
24+
def _patch_prefect(deployments):
25+
"""Patch get_client to return mock deployments."""
26+
client = AsyncMock()
27+
client.read_deployments = AsyncMock(side_effect=[deployments, []])
28+
client.__aenter__ = AsyncMock(return_value=client)
29+
client.__aexit__ = AsyncMock(return_value=False)
30+
return patch("model_ledger.connectors.prefect.get_client", return_value=client)
31+
32+
33+
def test_returns_source_connector():
34+
with _patch_prefect([]):
35+
c = prefect_connector(name="test")
36+
assert isinstance(c, SourceConnector)
37+
assert c.name == "test"
38+
39+
40+
def test_discovers_deployments():
41+
deps = [
42+
_make_deployment("scorer-prod", tags=["application:risk-ml", "repo:forge-risk"]),
43+
_make_deployment("etl-prod", tags=["application:data-eng"]),
44+
]
45+
with _patch_prefect(deps):
46+
nodes = prefect_connector().discover()
47+
assert len(nodes) == 2
48+
assert nodes[0].name == "scorer-prod"
49+
assert nodes[0].metadata["application"] == "risk-ml"
50+
assert nodes[1].name == "etl-prod"
51+
52+
53+
def test_deduplicates_by_name():
54+
deps = [
55+
_make_deployment("scorer-prod"),
56+
_make_deployment("scorer-prod"),
57+
]
58+
with _patch_prefect(deps):
59+
nodes = prefect_connector().discover()
60+
assert len(nodes) == 1
61+
62+
63+
def test_no_tag_filter_returns_all():
64+
deps = [
65+
_make_deployment("a", tags=["deploy_from:main"]),
66+
_make_deployment("b", tags=["deploy_from:user"]),
67+
]
68+
with _patch_prefect(deps):
69+
nodes = prefect_connector(tag_filter=None).discover()
70+
assert len(nodes) == 2
71+
72+
73+
def test_tag_filter_passed_to_api():
74+
client = AsyncMock()
75+
client.read_deployments = AsyncMock(return_value=[])
76+
client.__aenter__ = AsyncMock(return_value=client)
77+
client.__aexit__ = AsyncMock(return_value=False)
78+
79+
mock_filter = MagicMock()
80+
mock_tags = MagicMock()
81+
82+
with patch("model_ledger.connectors.prefect.get_client", return_value=client), \
83+
patch("model_ledger.connectors.prefect.DeploymentFilter", mock_filter), \
84+
patch("model_ledger.connectors.prefect.DeploymentFilterTags", mock_tags):
85+
prefect_connector(tag_filter=["deploy_from:main"]).discover()
86+
mock_tags.assert_called_once_with(all_=["deploy_from:main"])
87+
mock_filter.assert_called_once()
88+
89+
90+
def test_extracts_schedule():
91+
deps = [_make_deployment("scheduled", schedules=[_make_schedule("0 6 * * *")])]
92+
with _patch_prefect(deps):
93+
nodes = prefect_connector().discover()
94+
assert nodes[0].metadata["schedule"] == "0 6 * * *"
95+
assert nodes[0].metadata["has_schedule"] is True
96+
97+
98+
def test_no_schedule():
99+
deps = [_make_deployment("unscheduled", schedules=[])]
100+
with _patch_prefect(deps):
101+
nodes = prefect_connector().discover()
102+
assert nodes[0].metadata["schedule"] is None
103+
assert nodes[0].metadata["has_schedule"] is False
104+
105+
106+
def test_extracts_source_updated_at():
107+
from datetime import datetime, timezone
108+
ts = datetime(2026, 4, 14, 12, 0, 0, tzinfo=timezone.utc)
109+
deps = [_make_deployment("model", updated=ts)]
110+
with _patch_prefect(deps):
111+
nodes = prefect_connector().discover()
112+
assert nodes[0].metadata["source_updated_at"] == ts.isoformat()
113+
114+
115+
def test_empty_result():
116+
with _patch_prefect([]):
117+
nodes = prefect_connector().discover()
118+
assert nodes == []

0 commit comments

Comments
 (0)