Skip to content

Commit 7bf43c2

Browse files
authored
Merge pull request #21 from airlock-protocol/worktree-agent-a0e41c0d
feat: add compliance module with agent inventory and risk reporting
2 parents d374524 + 0427340 commit 7bf43c2

14 files changed

Lines changed: 1739 additions & 0 deletions

airlock/compliance/__init__.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
from __future__ import annotations
2+
3+
"""RBI FREE-AI compliance module for the Airlock Protocol."""
4+
5+
from airlock.compliance.bias_detector import BiasDetector
6+
from airlock.compliance.free_ai_mapper import FreeAIMapper
7+
from airlock.compliance.incident import IncidentStore
8+
from airlock.compliance.inventory import AgentInventory
9+
from airlock.compliance.report_generator import ComplianceReportGenerator
10+
from airlock.compliance.risk_classifier import RiskClassifier
11+
from airlock.compliance.schemas import (
12+
AgentInventoryEntry,
13+
ComplianceReport,
14+
IncidentReport,
15+
RiskClassification,
16+
RiskLevel,
17+
)
18+
19+
__all__ = [
20+
"AgentInventory",
21+
"AgentInventoryEntry",
22+
"BiasDetector",
23+
"ComplianceReport",
24+
"ComplianceReportGenerator",
25+
"FreeAIMapper",
26+
"IncidentReport",
27+
"IncidentStore",
28+
"RiskClassification",
29+
"RiskClassifier",
30+
"RiskLevel",
31+
]
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
from __future__ import annotations
2+
3+
"""Bias detection for agent verification patterns."""
4+
5+
import logging
6+
import statistics
7+
from typing import Any
8+
9+
logger = logging.getLogger(__name__)
10+
11+
12+
class BiasDetector:
13+
"""Detects potential bias in verification outcomes and trust distributions."""
14+
15+
def analyze_verification_patterns(
16+
self,
17+
results: list[dict[str, Any]],
18+
) -> dict[str, Any]:
19+
"""Analyze verification results for potential bias.
20+
21+
Each result dict should have at least 'outcome' (str) and optionally
22+
'agent_type' (str) keys.
23+
"""
24+
if not results:
25+
return {
26+
"bias_detected": False,
27+
"bias_type": None,
28+
"confidence": 0.0,
29+
"details": "insufficient data",
30+
}
31+
32+
# Group by agent_type
33+
groups: dict[str, list[dict[str, Any]]] = {}
34+
for r in results:
35+
agent_type = str(r.get("agent_type", "unknown"))
36+
groups.setdefault(agent_type, []).append(r)
37+
38+
# Compute pass rates per group
39+
pass_rates: dict[str, float] = {}
40+
for group_name, group_results in groups.items():
41+
passed = sum(1 for r in group_results if r.get("outcome") == "verified")
42+
pass_rates[group_name] = passed / len(group_results) if group_results else 0.0
43+
44+
# Check for significant disparity
45+
if len(pass_rates) < 2:
46+
return {
47+
"bias_detected": False,
48+
"bias_type": None,
49+
"confidence": 0.0,
50+
"details": "single group, no comparison possible",
51+
"pass_rates": pass_rates,
52+
}
53+
54+
rates = list(pass_rates.values())
55+
max_disparity = max(rates) - min(rates)
56+
57+
bias_detected = max_disparity > 0.3
58+
bias_type = "outcome_disparity" if bias_detected else None
59+
confidence = min(max_disparity / 0.5, 1.0) if bias_detected else 0.0
60+
61+
return {
62+
"bias_detected": bias_detected,
63+
"bias_type": bias_type,
64+
"confidence": round(confidence, 3),
65+
"max_disparity": round(max_disparity, 3),
66+
"pass_rates": pass_rates,
67+
}
68+
69+
def analyze_trust_distribution(
70+
self,
71+
scores: list[float],
72+
) -> dict[str, Any]:
73+
"""Analyze the distribution of trust scores for fairness."""
74+
if not scores:
75+
return {
76+
"count": 0,
77+
"mean": 0.0,
78+
"median": 0.0,
79+
"std_dev": 0.0,
80+
"min": 0.0,
81+
"max": 0.0,
82+
"skew_warning": False,
83+
}
84+
85+
mean = statistics.mean(scores)
86+
median = statistics.median(scores)
87+
std_dev = statistics.stdev(scores) if len(scores) >= 2 else 0.0
88+
min_score = min(scores)
89+
max_score = max(scores)
90+
91+
# Flag if distribution is heavily skewed
92+
skew_warning = abs(mean - median) > 0.15
93+
94+
return {
95+
"count": len(scores),
96+
"mean": round(mean, 4),
97+
"median": round(median, 4),
98+
"std_dev": round(std_dev, 4),
99+
"min": round(min_score, 4),
100+
"max": round(max_score, 4),
101+
"skew_warning": skew_warning,
102+
}
Lines changed: 159 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
from __future__ import annotations
2+
3+
"""Maps Airlock Protocol features to RBI FREE-AI Framework sutras and recommendations."""
4+
5+
import logging
6+
from typing import Any
7+
8+
from airlock.compliance.incident import IncidentStore
9+
from airlock.compliance.inventory import AgentInventory
10+
11+
logger = logging.getLogger(__name__)
12+
13+
# RBI FREE-AI Framework: 7 Sutras
14+
SUTRAS: dict[str, str] = {
15+
"sutra_1": "Governance & Oversight",
16+
"sutra_2": "Risk Management",
17+
"sutra_3": "Data Governance",
18+
"sutra_4": "Model Development & Validation",
19+
"sutra_5": "Fairness & Bias",
20+
"sutra_6": "Transparency & Explainability",
21+
"sutra_7": "Accountability & Audit",
22+
}
23+
24+
# Selected RBI FREE-AI recommendations mapped to Airlock features
25+
RECOMMENDATION_MAP: dict[str, dict[str, str]] = {
26+
"rec_14": {
27+
"title": "AI Model Inventory",
28+
"airlock_feature": "agent_inventory",
29+
"sutra": "sutra_1",
30+
},
31+
"rec_15": {
32+
"title": "Risk Classification",
33+
"airlock_feature": "risk_classifier",
34+
"sutra": "sutra_2",
35+
},
36+
"rec_16": {
37+
"title": "Incident Reporting",
38+
"airlock_feature": "incident_store",
39+
"sutra": "sutra_2",
40+
},
41+
"rec_17": {
42+
"title": "Audit Trail",
43+
"airlock_feature": "audit_trail",
44+
"sutra": "sutra_7",
45+
},
46+
"rec_18": {
47+
"title": "Bias Detection",
48+
"airlock_feature": "bias_detector",
49+
"sutra": "sutra_5",
50+
},
51+
"rec_19": {
52+
"title": "Trust Scoring Transparency",
53+
"airlock_feature": "trust_scoring",
54+
"sutra": "sutra_6",
55+
},
56+
"rec_20": {
57+
"title": "Identity Verification",
58+
"airlock_feature": "did_verification",
59+
"sutra": "sutra_1",
60+
},
61+
"rec_21": {
62+
"title": "Capability Assessment",
63+
"airlock_feature": "vc_capability",
64+
"sutra": "sutra_4",
65+
},
66+
"rec_22": {
67+
"title": "Data Privacy Controls",
68+
"airlock_feature": "privacy_mode",
69+
"sutra": "sutra_3",
70+
},
71+
"rec_23": {
72+
"title": "Compliance Reporting",
73+
"airlock_feature": "compliance_reports",
74+
"sutra": "sutra_7",
75+
},
76+
}
77+
78+
79+
class FreeAIMapper:
80+
"""Maps Airlock compliance status to RBI FREE-AI framework."""
81+
82+
def map_compliance_status(
83+
self,
84+
inventory: AgentInventory,
85+
incident_store: IncidentStore,
86+
) -> dict[str, Any]:
87+
"""Map current compliance state to FREE-AI sutras and recommendations."""
88+
agents = inventory.list_all()
89+
incidents = incident_store.list_all()
90+
91+
sutra_status: dict[str, dict[str, Any]] = {}
92+
for sutra_id, sutra_name in SUTRAS.items():
93+
mapped_recs = [
94+
rec_id
95+
for rec_id, rec_data in RECOMMENDATION_MAP.items()
96+
if rec_data["sutra"] == sutra_id
97+
]
98+
sutra_status[sutra_id] = {
99+
"name": sutra_name,
100+
"recommendation_count": len(mapped_recs),
101+
"recommendations": mapped_recs,
102+
"status": "active" if agents else "pending",
103+
}
104+
105+
recommendation_status: dict[str, dict[str, Any]] = {}
106+
for rec_id, rec_data in RECOMMENDATION_MAP.items():
107+
recommendation_status[rec_id] = self.get_recommendation_status(
108+
rec_id,
109+
inventory=inventory,
110+
incident_store=incident_store,
111+
)
112+
113+
return {
114+
"framework": "RBI FREE-AI",
115+
"sutras": sutra_status,
116+
"recommendations": recommendation_status,
117+
"total_agents_tracked": len(agents),
118+
"total_incidents": len(incidents),
119+
}
120+
121+
def get_recommendation_status(
122+
self,
123+
rec_id: str,
124+
inventory: AgentInventory | None = None,
125+
incident_store: IncidentStore | None = None,
126+
) -> dict[str, Any]:
127+
"""Get the implementation status of a specific recommendation."""
128+
rec_data = RECOMMENDATION_MAP.get(rec_id)
129+
if rec_data is None:
130+
return {"error": f"Unknown recommendation: {rec_id}"}
131+
132+
feature = rec_data["airlock_feature"]
133+
implemented = True # All mapped features exist in the codebase
134+
active = False
135+
136+
if feature == "agent_inventory" and inventory is not None:
137+
active = len(inventory.list_all()) > 0
138+
elif feature == "incident_store" and incident_store is not None:
139+
active = len(incident_store.list_all()) > 0
140+
elif feature in (
141+
"risk_classifier",
142+
"bias_detector",
143+
"trust_scoring",
144+
"did_verification",
145+
"vc_capability",
146+
"privacy_mode",
147+
"audit_trail",
148+
"compliance_reports",
149+
):
150+
active = True # Core features are always active
151+
152+
return {
153+
"rec_id": rec_id,
154+
"title": rec_data["title"],
155+
"sutra": rec_data["sutra"],
156+
"airlock_feature": feature,
157+
"implemented": implemented,
158+
"active": active,
159+
}

0 commit comments

Comments
 (0)