Skip to content

Commit f7c62c3

Browse files
committed
add compliance module with agent inventory, risk classification, and FREE-AI reporting
1 parent 6760e43 commit f7c62c3

14 files changed

Lines changed: 1902 additions & 0 deletions

airlock/compliance/__init__.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
"""RBI FREE-AI compliance module for the Airlock Protocol."""
2+
3+
from __future__ import annotations
4+
5+
from airlock.compliance.bias_detector import BiasDetector
6+
from airlock.compliance.free_ai_mapper import FreeAIMapper
7+
from airlock.compliance.incident import IncidentStore
8+
from airlock.compliance.inventory import AgentInventory
9+
from airlock.compliance.report_generator import ComplianceReportGenerator
10+
from airlock.compliance.risk_classifier import RiskClassifier
11+
from airlock.compliance.schemas import (
12+
AgentInventoryEntry,
13+
ComplianceReport,
14+
IncidentReport,
15+
RiskClassification,
16+
RiskLevel,
17+
)
18+
19+
__all__ = [
20+
"AgentInventory",
21+
"AgentInventoryEntry",
22+
"BiasDetector",
23+
"ComplianceReport",
24+
"ComplianceReportGenerator",
25+
"FreeAIMapper",
26+
"IncidentReport",
27+
"IncidentStore",
28+
"RiskClassification",
29+
"RiskClassifier",
30+
"RiskLevel",
31+
]
Lines changed: 133 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,133 @@
1+
"""Basic bias detection for AI agent behavior patterns -- FREE-AI Rec #15."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
import statistics
7+
from typing import Any
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
class BiasDetector:
13+
"""Basic bias detection for AI agent behavior patterns."""
14+
15+
def analyze_verification_patterns(
16+
self, verification_results: list[dict[str, Any]]
17+
) -> dict[str, Any]:
18+
"""Detect statistical bias in verification outcomes.
19+
20+
Looks for skewed pass/fail rates that could indicate systematic bias
21+
in the verification process.
22+
"""
23+
if not verification_results:
24+
return {
25+
"bias_detected": False,
26+
"bias_type": "none",
27+
"confidence": 0.0,
28+
"details": "No verification data available",
29+
}
30+
31+
total = len(verification_results)
32+
passed = sum(1 for r in verification_results if r.get("result") == "passed")
33+
failed = total - passed
34+
35+
pass_rate = passed / total if total > 0 else 0.0
36+
37+
# Check for extreme skew (>90% or <10% pass rate with sufficient data)
38+
if total >= 10 and (pass_rate > 0.9 or pass_rate < 0.1):
39+
bias_type = "high_pass_rate" if pass_rate > 0.9 else "high_fail_rate"
40+
return {
41+
"bias_detected": True,
42+
"bias_type": bias_type,
43+
"confidence": min(0.5 + (total / 100), 0.95),
44+
"details": (
45+
f"Verification pass rate is {pass_rate:.1%} "
46+
f"across {total} verifications (passed={passed}, failed={failed})"
47+
),
48+
}
49+
50+
return {
51+
"bias_detected": False,
52+
"bias_type": "none",
53+
"confidence": min(0.5 + (total / 100), 0.95),
54+
"details": (
55+
f"Verification pass rate is {pass_rate:.1%} "
56+
f"across {total} verifications -- within expected range"
57+
),
58+
}
59+
60+
def analyze_trust_distribution(
61+
self, trust_scores: list[float]
62+
) -> dict[str, Any]:
63+
"""Analyze trust score distribution for anomalies.
64+
65+
Checks for abnormal clustering, extreme values, or unexpected
66+
distribution patterns that could indicate bias.
67+
"""
68+
if not trust_scores:
69+
return {
70+
"anomaly_detected": False,
71+
"distribution_type": "unknown",
72+
"details": "No trust score data available",
73+
}
74+
75+
if len(trust_scores) < 2:
76+
return {
77+
"anomaly_detected": False,
78+
"distribution_type": "insufficient_data",
79+
"mean": trust_scores[0] if trust_scores else 0.0,
80+
"details": "Insufficient data for distribution analysis",
81+
}
82+
83+
mean = statistics.mean(trust_scores)
84+
stdev = statistics.stdev(trust_scores)
85+
median = statistics.median(trust_scores)
86+
87+
# Detect clustering at extremes
88+
low_cluster = sum(1 for s in trust_scores if s < 0.2) / len(trust_scores)
89+
high_cluster = sum(1 for s in trust_scores if s > 0.8) / len(trust_scores)
90+
91+
anomaly = False
92+
distribution_type = "normal"
93+
details_parts: list[str] = []
94+
95+
if stdev < 0.05 and len(trust_scores) >= 5:
96+
anomaly = True
97+
distribution_type = "uniform_cluster"
98+
details_parts.append(
99+
f"Scores are unusually clustered (stdev={stdev:.3f})"
100+
)
101+
102+
if low_cluster > 0.5:
103+
anomaly = True
104+
distribution_type = "low_skew"
105+
details_parts.append(
106+
f"{low_cluster:.0%} of scores below 0.2"
107+
)
108+
109+
if high_cluster > 0.8:
110+
anomaly = True
111+
distribution_type = "high_skew"
112+
details_parts.append(
113+
f"{high_cluster:.0%} of scores above 0.8"
114+
)
115+
116+
if abs(mean - median) > 0.2:
117+
anomaly = True
118+
distribution_type = "skewed"
119+
details_parts.append(
120+
f"Mean-median gap of {abs(mean - median):.2f} suggests skew"
121+
)
122+
123+
if not details_parts:
124+
details_parts.append("Distribution appears normal")
125+
126+
return {
127+
"anomaly_detected": anomaly,
128+
"distribution_type": distribution_type,
129+
"mean": round(mean, 4),
130+
"stdev": round(stdev, 4),
131+
"median": round(median, 4),
132+
"details": "; ".join(details_parts),
133+
}
Lines changed: 168 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,168 @@
1+
"""Maps Airlock data to RBI FREE-AI 7 Sutras and 26 recommendations."""
2+
3+
from __future__ import annotations
4+
5+
import logging
6+
from typing import Any
7+
8+
from airlock.compliance.incident import IncidentStore
9+
from airlock.compliance.inventory import AgentInventory
10+
11+
logger = logging.getLogger(__name__)
12+
13+
# FREE-AI Sutras
14+
SUTRAS: dict[str, str] = {
15+
"sutra_1": "Governance & Oversight",
16+
"sutra_2": "Risk Management",
17+
"sutra_3": "Data Privacy & Security",
18+
"sutra_4": "Transparency & Explainability",
19+
"sutra_5": "Fairness & Non-discrimination",
20+
"sutra_6": "Accountability & Audit",
21+
"sutra_7": "Consumer Protection",
22+
}
23+
24+
# Recommendation -> Airlock feature mapping
25+
RECOMMENDATION_MAP: dict[str, dict[str, str]] = {
26+
"rec_13": {
27+
"title": "Incident Reporting",
28+
"airlock_feature": "incident_tracking",
29+
"sutra": "sutra_2",
30+
},
31+
"rec_14": {
32+
"title": "AI Model Inventory",
33+
"airlock_feature": "agent_inventory",
34+
"sutra": "sutra_1",
35+
},
36+
"rec_15": {
37+
"title": "Auditor Assessment by Risk Profile",
38+
"airlock_feature": "risk_classification",
39+
"sutra": "sutra_2",
40+
},
41+
"rec_16": {
42+
"title": "AI Disclosures in Annual Reports",
43+
"airlock_feature": "compliance_reports",
44+
"sutra": "sutra_4",
45+
},
46+
"rec_19": {
47+
"title": "Internal Audit Proportional to Risk",
48+
"airlock_feature": "audit_trail",
49+
"sutra": "sutra_6",
50+
},
51+
"rec_20": {
52+
"title": "Red Teaming",
53+
"airlock_feature": "adversarial_testing",
54+
"sutra": "sutra_2",
55+
},
56+
}
57+
58+
59+
class FreeAIMapper:
60+
"""Map Airlock compliance state to FREE-AI recommendations."""
61+
62+
def map_compliance_status(
63+
self,
64+
inventory: AgentInventory,
65+
incident_store: IncidentStore,
66+
) -> dict[str, dict[str, Any]]:
67+
"""Map current Airlock state to FREE-AI recommendation compliance status.
68+
69+
Returns a dict keyed by recommendation ID with compliance details.
70+
"""
71+
result: dict[str, dict[str, Any]] = {}
72+
for rec_id, rec_info in RECOMMENDATION_MAP.items():
73+
result[rec_id] = self.get_recommendation_status(
74+
rec_id, inventory, incident_store
75+
)
76+
return result
77+
78+
def get_recommendation_status(
79+
self,
80+
rec_id: str,
81+
inventory: AgentInventory,
82+
incident_store: IncidentStore,
83+
) -> dict[str, Any]:
84+
"""Get compliance status for a specific recommendation."""
85+
rec_info = RECOMMENDATION_MAP.get(rec_id)
86+
if rec_info is None:
87+
return {
88+
"status": "unknown",
89+
"title": "Unknown Recommendation",
90+
"details": f"Recommendation {rec_id} not mapped",
91+
}
92+
93+
feature = rec_info["airlock_feature"]
94+
sutra = rec_info["sutra"]
95+
title = rec_info["title"]
96+
97+
status = self._assess_feature_status(feature, inventory, incident_store)
98+
99+
return {
100+
"rec_id": rec_id,
101+
"title": title,
102+
"sutra": sutra,
103+
"sutra_name": SUTRAS.get(sutra, "Unknown"),
104+
"airlock_feature": feature,
105+
"status": status,
106+
}
107+
108+
def get_sutra_summary(
109+
self,
110+
inventory: AgentInventory,
111+
incident_store: IncidentStore,
112+
) -> dict[str, dict[str, Any]]:
113+
"""Get compliance summary grouped by Sutra."""
114+
full_mapping = self.map_compliance_status(inventory, incident_store)
115+
116+
sutra_summary: dict[str, dict[str, Any]] = {}
117+
for sutra_id, sutra_name in SUTRAS.items():
118+
recs = [
119+
v
120+
for v in full_mapping.values()
121+
if v.get("sutra") == sutra_id
122+
]
123+
compliant = sum(1 for r in recs if r.get("status") == "compliant")
124+
total = len(recs)
125+
sutra_summary[sutra_id] = {
126+
"name": sutra_name,
127+
"recommendations": total,
128+
"compliant": compliant,
129+
"status": "compliant" if compliant == total and total > 0 else "partial",
130+
}
131+
return sutra_summary
132+
133+
def _assess_feature_status(
134+
self,
135+
feature: str,
136+
inventory: AgentInventory,
137+
incident_store: IncidentStore,
138+
) -> str:
139+
"""Assess whether a specific Airlock feature meets compliance."""
140+
if feature == "agent_inventory":
141+
# Compliant if at least one agent is registered
142+
return "compliant" if len(inventory) > 0 else "not_implemented"
143+
144+
if feature == "risk_classification":
145+
# Compliant if agents have been assessed
146+
entries = inventory.list_all()
147+
if not entries:
148+
return "not_implemented"
149+
assessed = sum(1 for e in entries if e.last_assessed_at is not None)
150+
return "compliant" if assessed > 0 else "partial"
151+
152+
if feature == "incident_tracking":
153+
# Feature is available (store exists), compliant
154+
return "compliant"
155+
156+
if feature == "compliance_reports":
157+
# Feature is available (generator exists), compliant
158+
return "compliant"
159+
160+
if feature == "audit_trail":
161+
# Audit trail is always available in Airlock
162+
return "compliant"
163+
164+
if feature == "adversarial_testing":
165+
# Red teaming is a process, not a feature -- mark as partial
166+
return "partial"
167+
168+
return "unknown"

0 commit comments

Comments
 (0)