Skip to content

Commit 7bc8352

Browse files
fix: fail closed python sanctions screening
1 parent 340e8ff commit 7bc8352

3 files changed

Lines changed: 84 additions & 48 deletions

File tree

sdk/python/tests/test_compliance_engine.py

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,18 @@
33
import pytest
44

55
from zeroid.compliance.engine import ComplianceEngine, ComplianceCheckResult, CrossBorderCheckResult
6+
from zeroid.compliance.screening import SanctionsScreener, ScreeningEntry, ScreeningListType
7+
8+
9+
def configured_screener() -> SanctionsScreener:
10+
return SanctionsScreener([
11+
ScreeningEntry(
12+
name="Lazarus Group",
13+
list_type=ScreeningListType.SANCTIONS,
14+
jurisdiction="KP",
15+
identifiers=["0x" + "de" * 20],
16+
),
17+
])
618

719

820
class TestComplianceEngine:
@@ -45,27 +57,33 @@ def test_check_credentials_high_risk_warning(self) -> None:
4557
assert any("risk" in w.lower() for w in result.warnings)
4658

4759
def test_screen_entity_by_name(self) -> None:
48-
engine = ComplianceEngine()
60+
engine = ComplianceEngine(screener=configured_screener())
4961
result = engine.screen_entity(name="Lazarus Group")
5062
assert result.matched is True
5163

5264
def test_screen_entity_by_identifier(self) -> None:
53-
engine = ComplianceEngine()
65+
engine = ComplianceEngine(screener=configured_screener())
5466
result = engine.screen_entity(identifier="0x" + "de" * 20)
5567
assert result.matched is True
5668

5769
def test_screen_entity_no_match(self) -> None:
58-
engine = ComplianceEngine()
70+
engine = ComplianceEngine(screener=configured_screener())
5971
result = engine.screen_entity(name="Good Actor")
6072
assert result.matched is False
6173

74+
def test_screen_entity_default_screener_fails_closed(self) -> None:
75+
engine = ComplianceEngine()
76+
result = engine.screen_entity(name="Good Actor")
77+
assert result.matched is True
78+
assert result.error
79+
6280
def test_screen_entity_no_args_raises(self) -> None:
6381
engine = ComplianceEngine()
6482
with pytest.raises(ValueError, match="At least one"):
6583
engine.screen_entity()
6684

6785
def test_screen_entity_name_no_match_falls_to_identifier(self) -> None:
68-
engine = ComplianceEngine()
86+
engine = ComplianceEngine(screener=configured_screener())
6987
result = engine.screen_entity(
7088
name="Not Found", identifier="0x" + "de" * 20
7189
)

sdk/python/tests/test_compliance_screening.py

Lines changed: 39 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,32 @@
88
)
99

1010

11+
def sample_entries() -> list[ScreeningEntry]:
12+
return [
13+
ScreeningEntry(
14+
name="Lazarus Group",
15+
list_type=ScreeningListType.SANCTIONS,
16+
jurisdiction="KP",
17+
identifiers=["0x" + "de" * 20, "lazarus.kp"],
18+
reason="State-sponsored cyber operations",
19+
),
20+
ScreeningEntry(
21+
name="Tornado Cash",
22+
list_type=ScreeningListType.SANCTIONS,
23+
jurisdiction="GLOBAL",
24+
identifiers=["0x" + "ca" * 20],
25+
reason="OFAC SDN listing - mixer service",
26+
),
27+
ScreeningEntry(
28+
name="Test PEP Entity",
29+
list_type=ScreeningListType.PEP,
30+
jurisdiction="XX",
31+
identifiers=["pep-test-001"],
32+
reason="Politically exposed person - test entry",
33+
),
34+
]
35+
36+
1137
class TestScreeningListType:
1238
def test_values(self) -> None:
1339
assert ScreeningListType.SANCTIONS.value == "sanctions"
@@ -34,54 +60,55 @@ def test_creation(self) -> None:
3460

3561

3662
class TestSanctionsScreener:
37-
def test_defaults_loaded(self) -> None:
63+
def test_default_unconfigured_fails_closed(self) -> None:
3864
screener = SanctionsScreener()
3965
result = screener.screen_name("Lazarus")
4066
assert result.matched is True
41-
assert len(result.matches) == 1
67+
assert result.matches == []
68+
assert result.error
4269

4370
def test_screen_name_no_match(self) -> None:
44-
screener = SanctionsScreener()
71+
screener = SanctionsScreener(sample_entries())
4572
result = screener.screen_name("Innocent Corp")
4673
assert result.matched is False
4774

4875
def test_screen_name_case_insensitive(self) -> None:
49-
screener = SanctionsScreener()
76+
screener = SanctionsScreener(sample_entries())
5077
result = screener.screen_name("lazarus group")
5178
assert result.matched is True
5279

5380
def test_screen_name_substring(self) -> None:
54-
screener = SanctionsScreener()
81+
screener = SanctionsScreener(sample_entries())
5582
result = screener.screen_name("Tornado")
5683
assert result.matched is True
5784

5885
def test_screen_identifier_match(self) -> None:
59-
screener = SanctionsScreener()
86+
screener = SanctionsScreener(sample_entries())
6087
result = screener.screen_identifier("0x" + "de" * 20)
6188
assert result.matched is True
6289

6390
def test_screen_identifier_no_match(self) -> None:
64-
screener = SanctionsScreener()
91+
screener = SanctionsScreener(sample_entries())
6592
result = screener.screen_identifier("0x" + "00" * 20)
6693
assert result.matched is False
6794

6895
def test_screen_identifier_case_insensitive(self) -> None:
69-
screener = SanctionsScreener()
96+
screener = SanctionsScreener(sample_entries())
7097
result = screener.screen_identifier("0x" + "DE" * 20)
7198
assert result.matched is True
7299

73100
def test_screen_jurisdiction_match(self) -> None:
74-
screener = SanctionsScreener()
101+
screener = SanctionsScreener(sample_entries())
75102
result = screener.screen_jurisdiction("KP")
76103
assert result.matched is True
77104

78105
def test_screen_jurisdiction_no_match(self) -> None:
79-
screener = SanctionsScreener()
106+
screener = SanctionsScreener(sample_entries())
80107
result = screener.screen_jurisdiction("US")
81108
assert result.matched is False
82109

83110
def test_screen_jurisdiction_case_insensitive(self) -> None:
84-
screener = SanctionsScreener()
111+
screener = SanctionsScreener(sample_entries())
85112
result = screener.screen_jurisdiction("kp")
86113
assert result.matched is True
87114

@@ -100,6 +127,6 @@ def test_add_entry(self) -> None:
100127
assert result2.matched is True
101128

102129
def test_query_preserved(self) -> None:
103-
screener = SanctionsScreener()
130+
screener = SanctionsScreener(sample_entries())
104131
result = screener.screen_name("TestQuery")
105132
assert result.query == "TestQuery"

sdk/python/zeroid/compliance/screening.py

Lines changed: 23 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
"""Sanctions and PEP screening.
22
3-
Provides screening against mock sanctions/PEP watchlists for
4-
compliance checking.
3+
Callers must provide current, authoritative watchlist entries before using this
4+
module for compliance decisions. An unconfigured screener fails closed.
55
"""
66

77
from __future__ import annotations
@@ -50,41 +50,15 @@ class ScreeningResult:
5050
matched: bool
5151
matches: list[ScreeningEntry] = field(default_factory=list)
5252
query: str = ""
53+
error: str = ""
5354

5455

5556
class SanctionsScreener:
5657
"""Screens entities against sanctions and PEP watchlists."""
5758

58-
def __init__(self) -> None:
59-
"""Initialize the screener with a default mock watchlist."""
60-
self._entries: list[ScreeningEntry] = []
61-
self._load_defaults()
62-
63-
def _load_defaults(self) -> None:
64-
"""Load default mock watchlist entries."""
65-
self._entries = [
66-
ScreeningEntry(
67-
name="Lazarus Group",
68-
list_type=ScreeningListType.SANCTIONS,
69-
jurisdiction="KP",
70-
identifiers=["0x" + "de" * 20, "lazarus.kp"],
71-
reason="State-sponsored cyber operations",
72-
),
73-
ScreeningEntry(
74-
name="Tornado Cash",
75-
list_type=ScreeningListType.SANCTIONS,
76-
jurisdiction="GLOBAL",
77-
identifiers=["0x" + "ca" * 20],
78-
reason="OFAC SDN listing — mixer service",
79-
),
80-
ScreeningEntry(
81-
name="Test PEP Entity",
82-
list_type=ScreeningListType.PEP,
83-
jurisdiction="XX",
84-
identifiers=["pep-test-001"],
85-
reason="Politically exposed person — test entry",
86-
),
87-
]
59+
def __init__(self, entries: list[ScreeningEntry] | None = None) -> None:
60+
"""Initialize the screener with caller-supplied watchlist entries."""
61+
self._entries = list(entries or [])
8862

8963
def add_entry(self, entry: ScreeningEntry) -> None:
9064
"""Add an entry to the watchlist.
@@ -105,6 +79,9 @@ def screen_name(self, name: str) -> ScreeningResult:
10579
Returns:
10680
ScreeningResult with any matches.
10781
"""
82+
if not self._entries:
83+
return self._unconfigured_result(name)
84+
10885
matches = [
10986
e for e in self._entries
11087
if name.lower() in e.name.lower() or e.name.lower() in name.lower()
@@ -120,6 +97,9 @@ def screen_identifier(self, identifier: str) -> ScreeningResult:
12097
Returns:
12198
ScreeningResult with any matches.
12299
"""
100+
if not self._entries:
101+
return self._unconfigured_result(identifier)
102+
123103
identifier_lower = identifier.lower()
124104
matches = [
125105
e for e in self._entries
@@ -138,6 +118,9 @@ def screen_jurisdiction(self, jurisdiction_code: str) -> ScreeningResult:
138118
Returns:
139119
ScreeningResult with any matches.
140120
"""
121+
if not self._entries:
122+
return self._unconfigured_result(jurisdiction_code)
123+
141124
code_upper = jurisdiction_code.upper()
142125
matches = [
143126
e for e in self._entries
@@ -146,3 +129,11 @@ def screen_jurisdiction(self, jurisdiction_code: str) -> ScreeningResult:
146129
return ScreeningResult(
147130
matched=len(matches) > 0, matches=matches, query=jurisdiction_code
148131
)
132+
133+
def _unconfigured_result(self, query: str) -> ScreeningResult:
134+
return ScreeningResult(
135+
matched=True,
136+
matches=[],
137+
query=query,
138+
error="screening watchlist entries are not configured",
139+
)

0 commit comments

Comments
 (0)