|
| 1 | +"""Trace anonymization: strip identifying information from exported traces. |
| 2 | +
|
| 3 | +Applied at export time — original session data is never modified. |
| 4 | +Complements secret redaction (redact.py), which strips secrets at capture |
| 5 | +time. Anonymization strips identity: paths, hostnames, usernames, emails. |
| 6 | +
|
| 7 | +Rules applied: |
| 8 | + - Home directory paths → ~/relative/path |
| 9 | + - Hostnames (from socket.gethostname()) → <hostname> |
| 10 | + - Email addresses → <email> |
| 11 | + - OS username (from os.getlogin / env) → <user> |
| 12 | + - Custom regex patterns from .agent-strace/anonymize.yaml |
| 13 | +
|
| 14 | +Usage: |
| 15 | + agent-strace export <session-id> --anonymize --output trace.json |
| 16 | + agent-strace export <session-id> --anonymize --dry-run |
| 17 | +""" |
| 18 | + |
| 19 | +from __future__ import annotations |
| 20 | + |
| 21 | +import json |
| 22 | +import os |
| 23 | +import re |
| 24 | +import socket |
| 25 | +import sys |
| 26 | +from dataclasses import dataclass, field |
| 27 | +from pathlib import Path |
| 28 | +from typing import Any, TextIO |
| 29 | + |
| 30 | +from .models import TraceEvent, SessionMeta |
| 31 | +from .store import TraceStore |
| 32 | + |
| 33 | + |
| 34 | +# --------------------------------------------------------------------------- |
| 35 | +# Built-in anonymization rules |
| 36 | +# --------------------------------------------------------------------------- |
| 37 | + |
| 38 | +_EMAIL_RE = re.compile( |
| 39 | + r"\b[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}\b" |
| 40 | +) |
| 41 | + |
| 42 | + |
| 43 | +def _get_home_dir() -> str: |
| 44 | + return str(Path.home()) |
| 45 | + |
| 46 | + |
| 47 | +def _get_hostname() -> str: |
| 48 | + try: |
| 49 | + return socket.gethostname() |
| 50 | + except Exception: |
| 51 | + return "" |
| 52 | + |
| 53 | + |
| 54 | +def _get_username() -> str: |
| 55 | + for key in ("USER", "USERNAME", "LOGNAME"): |
| 56 | + val = os.environ.get(key, "") |
| 57 | + if val: |
| 58 | + return val |
| 59 | + try: |
| 60 | + return os.getlogin() |
| 61 | + except Exception: |
| 62 | + return "" |
| 63 | + |
| 64 | + |
| 65 | +@dataclass |
| 66 | +class AnonymizationRule: |
| 67 | + """A single find-and-replace rule.""" |
| 68 | + pattern: re.Pattern |
| 69 | + replacement: str |
| 70 | + description: str = "" |
| 71 | + |
| 72 | + |
| 73 | +@dataclass |
| 74 | +class AnonymizationResult: |
| 75 | + """Summary of what was anonymized.""" |
| 76 | + rules_applied: dict[str, int] = field(default_factory=dict) # description → count |
| 77 | + |
| 78 | + @property |
| 79 | + def total_replacements(self) -> int: |
| 80 | + return sum(self.rules_applied.values()) |
| 81 | + |
| 82 | + def record(self, description: str, count: int = 1) -> None: |
| 83 | + self.rules_applied[description] = self.rules_applied.get(description, 0) + count |
| 84 | + |
| 85 | + |
| 86 | +def _build_builtin_rules() -> list[AnonymizationRule]: |
| 87 | + """Build the default set of anonymization rules from the current environment.""" |
| 88 | + rules: list[AnonymizationRule] = [] |
| 89 | + |
| 90 | + # Home directory paths |
| 91 | + home = _get_home_dir() |
| 92 | + if home and home != "/": |
| 93 | + # Match the home dir prefix in paths, replace with ~ |
| 94 | + escaped = re.escape(home) |
| 95 | + rules.append(AnonymizationRule( |
| 96 | + pattern=re.compile(escaped + r"(/[^\s\"']*)?"), |
| 97 | + replacement=lambda m: "~" + (m.group(1) or ""), |
| 98 | + description="home directory paths", |
| 99 | + )) |
| 100 | + |
| 101 | + # Hostname |
| 102 | + hostname = _get_hostname() |
| 103 | + if hostname: |
| 104 | + rules.append(AnonymizationRule( |
| 105 | + pattern=re.compile(re.escape(hostname)), |
| 106 | + replacement="<hostname>", |
| 107 | + description="hostname", |
| 108 | + )) |
| 109 | + |
| 110 | + # Username |
| 111 | + username = _get_username() |
| 112 | + if username and len(username) >= 3: |
| 113 | + # Only replace when it looks like a standalone word to avoid false positives |
| 114 | + rules.append(AnonymizationRule( |
| 115 | + pattern=re.compile(r"\b" + re.escape(username) + r"\b"), |
| 116 | + replacement="<user>", |
| 117 | + description="username", |
| 118 | + )) |
| 119 | + |
| 120 | + # Email addresses |
| 121 | + rules.append(AnonymizationRule( |
| 122 | + pattern=_EMAIL_RE, |
| 123 | + replacement="<email>", |
| 124 | + description="email addresses", |
| 125 | + )) |
| 126 | + |
| 127 | + return rules |
| 128 | + |
| 129 | + |
| 130 | +def _load_custom_rules(config_path: str | Path | None = None) -> list[AnonymizationRule]: |
| 131 | + """Load custom rules from .agent-strace/anonymize.yaml or the given path.""" |
| 132 | + paths = [] |
| 133 | + if config_path: |
| 134 | + paths.append(Path(config_path)) |
| 135 | + paths += [ |
| 136 | + Path(".agent-strace/anonymize.yaml"), |
| 137 | + Path(".agent-strace/anonymize.yml"), |
| 138 | + ] |
| 139 | + |
| 140 | + for p in paths: |
| 141 | + if p.exists(): |
| 142 | + try: |
| 143 | + return _parse_custom_rules(p.read_text()) |
| 144 | + except Exception: |
| 145 | + pass |
| 146 | + return [] |
| 147 | + |
| 148 | + |
| 149 | +def _parse_custom_rules(text: str) -> list[AnonymizationRule]: |
| 150 | + """Parse a minimal YAML rules file into AnonymizationRule objects. |
| 151 | +
|
| 152 | + Expected format: |
| 153 | + rules: |
| 154 | + - pattern: "regex here" |
| 155 | + replacement: "<REDACTED>" |
| 156 | + """ |
| 157 | + rules: list[AnonymizationRule] = [] |
| 158 | + current: dict = {} |
| 159 | + in_rules = False |
| 160 | + |
| 161 | + for raw_line in text.splitlines(): |
| 162 | + line = raw_line.rstrip() |
| 163 | + stripped = line.lstrip() |
| 164 | + if not stripped or stripped.startswith("#"): |
| 165 | + continue |
| 166 | + if stripped == "rules:": |
| 167 | + in_rules = True |
| 168 | + continue |
| 169 | + if not in_rules: |
| 170 | + continue |
| 171 | + if stripped.startswith("- "): |
| 172 | + if current: |
| 173 | + _try_add_rule(current, rules) |
| 174 | + current = {} |
| 175 | + rest = stripped[2:].strip() |
| 176 | + if ":" in rest: |
| 177 | + k, _, v = rest.partition(":") |
| 178 | + current[k.strip()] = v.strip().strip('"').strip("'") |
| 179 | + elif current is not None and ":" in stripped: |
| 180 | + k, _, v = stripped.partition(":") |
| 181 | + current[k.strip()] = v.strip().strip('"').strip("'") |
| 182 | + |
| 183 | + if current: |
| 184 | + _try_add_rule(current, rules) |
| 185 | + |
| 186 | + return rules |
| 187 | + |
| 188 | + |
| 189 | +def _try_add_rule(d: dict, rules: list[AnonymizationRule]) -> None: |
| 190 | + pattern_str = d.get("pattern", "") |
| 191 | + replacement = d.get("replacement", "<REDACTED>") |
| 192 | + description = d.get("description", f"custom: {pattern_str[:40]}") |
| 193 | + if pattern_str: |
| 194 | + try: |
| 195 | + rules.append(AnonymizationRule( |
| 196 | + pattern=re.compile(pattern_str), |
| 197 | + replacement=replacement, |
| 198 | + description=description, |
| 199 | + )) |
| 200 | + except re.error: |
| 201 | + pass |
| 202 | + |
| 203 | + |
| 204 | +# --------------------------------------------------------------------------- |
| 205 | +# Core anonymization engine |
| 206 | +# --------------------------------------------------------------------------- |
| 207 | + |
| 208 | +def _apply_rules_to_string( |
| 209 | + text: str, |
| 210 | + rules: list[AnonymizationRule], |
| 211 | + result: AnonymizationResult, |
| 212 | +) -> str: |
| 213 | + """Apply all rules to a string, recording replacement counts.""" |
| 214 | + for rule in rules: |
| 215 | + new_text, count = rule.pattern.subn(rule.replacement, text) |
| 216 | + if count: |
| 217 | + result.record(rule.description, count) |
| 218 | + text = new_text |
| 219 | + return text |
| 220 | + |
| 221 | + |
| 222 | +def _anonymize_value( |
| 223 | + value: Any, |
| 224 | + rules: list[AnonymizationRule], |
| 225 | + result: AnonymizationResult, |
| 226 | +) -> Any: |
| 227 | + """Recursively anonymize a value (str, dict, list, or scalar).""" |
| 228 | + if isinstance(value, str): |
| 229 | + return _apply_rules_to_string(value, rules, result) |
| 230 | + if isinstance(value, dict): |
| 231 | + return {k: _anonymize_value(v, rules, result) for k, v in value.items()} |
| 232 | + if isinstance(value, list): |
| 233 | + return [_anonymize_value(item, rules, result) for item in value] |
| 234 | + return value |
| 235 | + |
| 236 | + |
| 237 | +def anonymize_event( |
| 238 | + event: TraceEvent, |
| 239 | + rules: list[AnonymizationRule], |
| 240 | + result: AnonymizationResult, |
| 241 | +) -> TraceEvent: |
| 242 | + """Return a new TraceEvent with anonymized data. Original is unchanged.""" |
| 243 | + new_data = _anonymize_value(event.data, rules, result) |
| 244 | + # Build a new event with the same fields but anonymized data |
| 245 | + import copy |
| 246 | + new_event = copy.copy(event) |
| 247 | + new_event.data = new_data |
| 248 | + return new_event |
| 249 | + |
| 250 | + |
| 251 | +def anonymize_session( |
| 252 | + store: TraceStore, |
| 253 | + session_id: str, |
| 254 | + custom_config: str | Path | None = None, |
| 255 | +) -> tuple[list[TraceEvent], AnonymizationResult]: |
| 256 | + """Load and anonymize all events for a session. |
| 257 | +
|
| 258 | + Returns (anonymized_events, result_summary). Original store is unchanged. |
| 259 | + """ |
| 260 | + events = store.load_events(session_id) |
| 261 | + rules = _build_builtin_rules() + _load_custom_rules(custom_config) |
| 262 | + result = AnonymizationResult() |
| 263 | + anonymized = [anonymize_event(ev, rules, result) for ev in events] |
| 264 | + return anonymized, result |
| 265 | + |
| 266 | + |
| 267 | +# --------------------------------------------------------------------------- |
| 268 | +# CLI handler |
| 269 | +# --------------------------------------------------------------------------- |
| 270 | + |
| 271 | +def cmd_anonymize_export(args, out: TextIO = sys.stdout) -> int: |
| 272 | + """Export a session with anonymization applied.""" |
| 273 | + store = TraceStore(args.trace_dir) |
| 274 | + |
| 275 | + session_id = getattr(args, "session_id", None) |
| 276 | + if not session_id: |
| 277 | + session_id = store.get_latest_session_id() |
| 278 | + if not session_id: |
| 279 | + sys.stderr.write("No sessions found.\n") |
| 280 | + return 1 |
| 281 | + |
| 282 | + if not store.session_exists(session_id): |
| 283 | + found = store.find_session(session_id) |
| 284 | + if found: |
| 285 | + session_id = found |
| 286 | + else: |
| 287 | + sys.stderr.write(f"Session not found: {session_id}\n") |
| 288 | + return 1 |
| 289 | + |
| 290 | + custom_config = getattr(args, "anonymize_config", None) |
| 291 | + dry_run = getattr(args, "dry_run", False) |
| 292 | + |
| 293 | + anonymized_events, result = anonymize_session(store, session_id, custom_config) |
| 294 | + |
| 295 | + if dry_run: |
| 296 | + out.write(f"Anonymizing session {session_id[:16]}...\n\n") |
| 297 | + if result.total_replacements == 0: |
| 298 | + out.write("Nothing to anonymize.\n") |
| 299 | + else: |
| 300 | + out.write("Would redact:\n") |
| 301 | + for desc, count in sorted(result.rules_applied.items()): |
| 302 | + out.write(f" {count:3d} {desc}\n") |
| 303 | + return 0 |
| 304 | + |
| 305 | + output_path = getattr(args, "output", "") or f"session-{session_id[:12]}-anon.json" |
| 306 | + meta = store.load_meta(session_id) |
| 307 | + |
| 308 | + export_data = { |
| 309 | + "session_id": session_id, |
| 310 | + "meta": json.loads(meta.to_json()), |
| 311 | + "anonymized": True, |
| 312 | + "events": [json.loads(ev.to_json()) for ev in anonymized_events], |
| 313 | + } |
| 314 | + |
| 315 | + Path(output_path).write_text(json.dumps(export_data, indent=2)) |
| 316 | + |
| 317 | + out.write(f"Anonymizing session {session_id[:16]}...\n\n") |
| 318 | + if result.total_replacements == 0: |
| 319 | + out.write("Nothing to anonymize.\n") |
| 320 | + else: |
| 321 | + out.write("Redacted:\n") |
| 322 | + for desc, count in sorted(result.rules_applied.items()): |
| 323 | + out.write(f" {count:3d} {desc}\n") |
| 324 | + out.write(f"\nExported anonymized trace to {output_path}\n") |
| 325 | + return 0 |
0 commit comments