Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
266 changes: 263 additions & 3 deletions src/bernstein/core/compliance.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,246 @@
raise ValueError(msg)


# ---------------------------------------------------------------------------
# SOC 2 Trust Services Criteria control mappings
# ---------------------------------------------------------------------------

# Maps SOC 2 control IDs to the artifact types that serve as evidence.
_SOC2_CONTROL_MAP: dict[str, dict[str, str]] = {
"CC6.1": {
"title": "Logical and Physical Access Controls",
"description": "The entity implements logical access security software, "
"infrastructure, and architectures over protected information assets.",
"evidence_types": "audit_logs,compliance_config",
},
"CC6.8": {
"title": "Controls Against Threats and Vulnerabilities",
"description": "The entity implements controls to prevent or detect and act upon "
"the introduction of unauthorized or malicious software.",
"evidence_types": "sbom,compliance_config",
},
"CC7.2": {
"title": "System Monitoring",
"description": "The entity monitors system components and the operation of those "
"components for anomalies that are indicative of malicious acts.",
"evidence_types": "audit_logs,wal",
},
"CC7.3": {
"title": "Detection and Response",
"description": "The entity evaluates anomalies and incidents to determine whether "
"they constitute security incidents.",
"evidence_types": "audit_logs,wal",
},
"CC8.1": {
"title": "Change Management",
"description": "The entity authorizes, designs, develops, configures, documents, "
"tests, approves, and implements changes to infrastructure and software.",
"evidence_types": "audit_logs,wal,sbom",
},
}


def _build_soc2_control_mappings(
artifacts: list[dict[str, Any]],
verification: dict[str, Any],
) -> list[dict[str, Any]]:
"""Build SOC 2 control-to-evidence mappings.

For each SOC 2 control, lists which artifacts in the package satisfy it
and whether the evidence is present and verified.

Args:
artifacts: Collected artifact metadata from the export.
verification: HMAC/Merkle verification results.

Returns:
List of control mapping dicts with status and evidence references.
"""
artifact_types = {a["type"] for a in artifacts}
hmac_valid = (
verification.get("hmac_chain", {}).get("valid", False)
if verification.get("hmac_chain")
else False
)

mappings: list[dict[str, Any]] = []
for control_id, meta in _SOC2_CONTROL_MAP.items():
required_types = set(meta["evidence_types"].split(","))
present = required_types & artifact_types
missing = required_types - artifact_types
satisfied = len(missing) == 0

mappings.append({
"control_id": control_id,
"title": meta["title"],
"description": meta["description"],
"satisfied": satisfied,
"evidence_present": sorted(present),
"evidence_missing": sorted(missing),
"integrity_verified": hmac_valid,
})

return mappings


def _build_merkle_attestation(merkle_dir: Path | None) -> dict[str, Any] | None:
"""Build a Merkle root attestation summary from seal files.

Reads all seal JSON files and produces an attestation record with the
latest root hash suitable for anchoring or external verification.

Args:
merkle_dir: Directory containing Merkle seal JSON files, or None.

Returns:
Attestation dict, or None if no seals exist.
"""
if merkle_dir is None or not merkle_dir.is_dir():
return None

seals: list[dict[str, Any]] = []
for seal_file in sorted(merkle_dir.glob("*.json")):

Check failure on line 609 in src/bernstein/core/compliance.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Define a constant instead of duplicating this literal "*.json" 3 times.

See more on https://sonarcloud.io/project/issues?id=chernistry_bernstein&issues=AZ1tnWyVnAsgo7AR9uLd&open=AZ1tnWyVnAsgo7AR9uLd&pullRequest=605
try:
seal_data = json.loads(seal_file.read_text())
seals.append({
"file": seal_file.name,
"root_hash": seal_data.get("root_hash", ""),
"sealed_at": seal_data.get("sealed_at_iso", ""),
"leaf_count": seal_data.get("leaf_count", 0),
"algorithm": seal_data.get("algorithm", "sha256"),
})
except (json.JSONDecodeError, OSError):
continue

if not seals:
return None

latest = seals[-1]
return {
"attestation_type": "merkle_root",
"latest_root_hash": latest["root_hash"],
"latest_sealed_at": latest["sealed_at"],
"total_seals": len(seals),
"seals": seals,
"generated_at": time.strftime(_ISO_TIMESTAMP_FMT, time.gmtime()),
}


def _build_evidence_summary(

Check failure on line 636 in src/bernstein/core/compliance.py

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Refactor this function to reduce its Cognitive Complexity from 27 to the 15 allowed.

See more on https://sonarcloud.io/project/issues?id=chernistry_bernstein&issues=AZ1tnWyVnAsgo7AR9uLe&open=AZ1tnWyVnAsgo7AR9uLe&pullRequest=605
period: str,
start_date: str,
end_date: str,
artifacts: list[dict[str, Any]],
verification: dict[str, Any],
control_mappings: list[dict[str, Any]],
merkle_attestation: dict[str, Any] | None,
) -> str:
"""Generate a PDF-ready Markdown evidence summary for auditors.

Args:
period: Original period string.
start_date: ISO start date.
end_date: ISO end date.
artifacts: Collected artifact metadata.
verification: HMAC/Merkle verification results.
control_mappings: SOC 2 control mapping results.
merkle_attestation: Merkle attestation data or None.

Returns:
Markdown string suitable for conversion to PDF.
"""
lines: list[str] = []
generated = time.strftime(_ISO_TIMESTAMP_FMT, time.gmtime())

lines.append(f"# SOC 2 Evidence Package — {period}")
lines.append("")
lines.append(f"**Period**: {start_date} to {end_date}")
lines.append(f"**Generated**: {generated}")
lines.append("**Package type**: SOC 2 Type II Evidence Bundle")
lines.append("")

# Executive summary
lines.append("## Executive Summary")
lines.append("")
total_controls = len(control_mappings)
satisfied = sum(1 for m in control_mappings if m["satisfied"])
hmac_chain = verification.get("hmac_chain")
hmac_status = "PASSED" if hmac_chain and hmac_chain.get("valid") else "NOT VERIFIED"
lines.append(
f"This package contains compliance evidence for the period "
f"{start_date} to {end_date}. "
f"**{satisfied}/{total_controls}** SOC 2 controls have supporting evidence. "
f"HMAC chain integrity: **{hmac_status}**."
)
lines.append("")

# Artifacts inventory
lines.append("## Evidence Artifacts")
lines.append("")
lines.append("| Artifact | Description | Files |")
lines.append("|----------|-------------|-------|")
for a in artifacts:
lines.append(f"| {a['type']} | {a['description']} | {a.get('file_count', 'N/A')} |")
if not artifacts:
lines.append("| *(none collected)* | | |")
lines.append("")

# Control mappings
lines.append("## SOC 2 Control Mappings")
lines.append("")
lines.append("| Control | Title | Status | Evidence Present | Missing |")
lines.append("|---------|-------|--------|------------------|---------|")
for m in control_mappings:
status = "Satisfied" if m["satisfied"] else "Gap"
present = ", ".join(m["evidence_present"]) or "—"
missing = ", ".join(m["evidence_missing"]) or "—"
lines.append(f"| {m['control_id']} | {m['title']} | {status} | {present} | {missing} |")
lines.append("")

# Integrity verification
lines.append("## Integrity Verification")
lines.append("")
hmac_info = verification.get("hmac_chain")
if hmac_info:
valid_str = "PASSED" if hmac_info.get("valid") else "FAILED"
lines.append(f"**HMAC Chain**: {valid_str}")
if hmac_info.get("errors"):
lines.append("")
for err in hmac_info["errors"]:
lines.append(f"- {err}")
verified_at = hmac_info.get("verified_at", "")
if verified_at:
lines.append("")
lines.append(f"Verified at: {verified_at}")
else:
lines.append("HMAC chain verification: *not performed*")
lines.append("")

# Merkle attestation
lines.append("## Merkle Root Attestation")
lines.append("")
if merkle_attestation:
lines.append(f"**Latest root hash**: `{merkle_attestation['latest_root_hash']}`")
lines.append(f"**Sealed at**: {merkle_attestation['latest_sealed_at']}")
lines.append(f"**Total seals**: {merkle_attestation['total_seals']}")
lines.append("")
lines.append("| Seal File | Root Hash | Sealed At |")
lines.append("|-----------|-----------|-----------|")
for s in merkle_attestation["seals"]:
root_short = s["root_hash"][:16] + "..." if len(s["root_hash"]) > 16 else s["root_hash"]
lines.append(f"| {s['file']} | `{root_short}` | {s['sealed_at']} |")
else:
lines.append("*No Merkle seals found for this period.*")
lines.append("")

# Footer
lines.append("---")
lines.append(f"*Generated by Bernstein Compliance Engine at {generated}*")
lines.append("")

return "\n".join(lines)


def export_soc2_package(
sdd_dir: Path,
period: str,
Expand Down Expand Up @@ -657,17 +897,35 @@
}
)

# --- 7. Write verification results -------------------------------------
# --- 7. SOC 2 control mappings ------------------------------------------
control_mappings = _build_soc2_control_mappings(artifacts_collected, verification)
(bundle_dir / "control_mappings.json").write_text(json.dumps(control_mappings, indent=2))

# --- 8. Merkle root attestation ----------------------------------------
merkle_attestation = _build_merkle_attestation(merkle_dir if merkle_dir.is_dir() else None)
if merkle_attestation:
(bundle_dir / "merkle_attestation.json").write_text(
json.dumps(merkle_attestation, indent=2)
)

# --- 9. Evidence summary (PDF-ready Markdown) --------------------------
evidence_summary = _build_evidence_summary(
period, start_date, end_date, artifacts_collected, verification,
control_mappings, merkle_attestation,
)
(bundle_dir / "evidence_summary.md").write_text(evidence_summary)

# --- 10. Write verification results ------------------------------------
(bundle_dir / "verification.json").write_text(json.dumps(verification, indent=2))

# --- 8. Compute package checksum ---------------------------------------
# --- 11. Compute package checksum --------------------------------------
file_checksums: dict[str, str] = {}
for path in sorted(bundle_dir.rglob("*")):
if path.is_file() and path.name != _MANIFEST_JSON:
digest = hashlib.sha256(path.read_bytes()).hexdigest()
file_checksums[str(path.relative_to(bundle_dir))] = digest

# --- 9. Write manifest -------------------------------------------------
# --- 12. Write manifest ------------------------------------------------
manifest = {
"package_type": "soc2-evidence",
"period": period,
Expand All @@ -676,6 +934,8 @@
"exported_at": time.strftime(_ISO_TIMESTAMP_FMT, time.gmtime()),
"artifacts": artifacts_collected,
"verification": verification,
"control_mappings": control_mappings,
"merkle_attestation": merkle_attestation,
"file_checksums": file_checksums,
}
(bundle_dir / _MANIFEST_JSON).write_text(json.dumps(manifest, indent=2))
Expand Down
19 changes: 19 additions & 0 deletions src/bernstein/core/orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -1463,6 +1463,25 @@ def run(self) -> None:
self._recover_from_wal()
except Exception:
logger.exception("WAL recovery failed (non-fatal) — continuing startup")
# Audit log integrity check: verify the last N HMAC-chained entries.
try:
from bernstein.core.audit_integrity import verify_on_startup

_integrity = verify_on_startup(self._workdir / ".sdd")
if not _integrity.valid:
logger.warning(
"Audit integrity check found %d error(s) — review with "
"'bernstein audit verify'",
len(_integrity.errors),
)
elif _integrity.entries_checked > 0:
logger.info(
"Audit integrity OK (%d entries verified in %.1fms)",
_integrity.entries_checked,
_integrity.duration_ms,
)
except Exception:
logger.exception("Audit integrity check failed (non-fatal) — continuing startup")
# Zombie cleanup: terminate orphaned agent processes from prior crashed runs.
try:
from bernstein.core.zombie_cleanup import scan_and_cleanup_zombies
Expand Down
Loading