Skip to content

Commit 5e07ff2

Browse files
Siddhant-K-codeOnaona-agent
authored
feat: v0.13.0 — auto-generate .agent-scope.json from observed traces (#29)
* feat: auto-generate .agent-scope.json from observed traces Adds policy.py with suggest_policy() that analyses one or more sessions and produces a minimal allow-list covering files read/written, commands run, and network hosts observed. Paths are collapsed to glob patterns (e.g. src/**) when 3+ files share a directory. Commands are collapsed to base-executable patterns (e.g. pytest *). CLI: agent-strace policy [session-ids...] [--output] [--dry-run] Closes #19 Co-authored-by: Ona <no-reply@ona.com> * fix: default to dry-run when --output not specified Without --output, cmd_policy now prints the suggested policy instead of silently writing to .agent-scope.json. This prevents accidental overwrites when the user just wants to inspect the suggestion. Co-authored-by: Ona <no-reply@ona.com> --------- Co-authored-by: Ona <ona@gitpod.io> Co-authored-by: Ona <no-reply@ona.com>
1 parent eb854db commit 5e07ff2

2 files changed

Lines changed: 467 additions & 0 deletions

File tree

src/agent_trace/policy.py

Lines changed: 316 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,316 @@
1+
"""Policy suggestion: auto-generate .agent-scope.json from observed traces.
2+
3+
Analyses one or more sessions and produces a minimal allow-list policy that
4+
covers exactly the files read/written and commands run. The output is a valid
5+
.agent-scope.json that can be used directly with `agent-strace audit`.
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import argparse
11+
import fnmatch
12+
import json
13+
import sys
14+
from dataclasses import dataclass, field
15+
from pathlib import Path
16+
from typing import TextIO
17+
18+
from .models import EventType, TraceEvent
19+
from .store import TraceStore
20+
21+
22+
# ---------------------------------------------------------------------------
23+
# Data structures
24+
# ---------------------------------------------------------------------------
25+
26+
@dataclass
27+
class PolicySuggestion:
28+
session_ids: list[str]
29+
files_read: list[str]
30+
files_written: list[str]
31+
commands: list[str]
32+
network_hosts: list[str]
33+
# Collapsed glob patterns (e.g. src/**/*.py instead of individual files)
34+
file_read_patterns: list[str]
35+
file_write_patterns: list[str]
36+
cmd_patterns: list[str]
37+
38+
39+
# ---------------------------------------------------------------------------
40+
# Pattern collapsing helpers
41+
# ---------------------------------------------------------------------------
42+
43+
_COMMON_DIRS = [
44+
"src", "tests", "lib", "app", "pkg", "internal",
45+
"components", "pages", "utils", "helpers",
46+
]
47+
48+
49+
def _collapse_paths(paths: list[str]) -> list[str]:
50+
"""Collapse a list of file paths into minimal glob patterns.
51+
52+
Groups files by directory prefix and emits ``dir/**`` when 3+ files share
53+
the same top-level directory. Individual files are kept as-is otherwise.
54+
"""
55+
if not paths:
56+
return []
57+
58+
from collections import Counter
59+
dir_counts: Counter = Counter()
60+
for p in paths:
61+
parts = Path(p).parts
62+
if len(parts) > 1:
63+
dir_counts[parts[0]] += 1
64+
65+
collapsed: list[str] = []
66+
covered: set[str] = set()
67+
68+
for top_dir, count in dir_counts.items():
69+
if count >= 3:
70+
collapsed.append(f"{top_dir}/**")
71+
covered.update(p for p in paths if Path(p).parts[0] == top_dir)
72+
73+
for p in paths:
74+
if p not in covered:
75+
collapsed.append(p)
76+
77+
return sorted(set(collapsed))
78+
79+
80+
def _collapse_commands(cmds: list[str]) -> list[str]:
81+
"""Collapse commands to their base executable (first token).
82+
83+
e.g. ``pytest tests/foo.py -x`` → ``pytest *``
84+
Deduplicates by base command and emits ``<cmd> *`` patterns.
85+
"""
86+
if not cmds:
87+
return []
88+
89+
bases: dict[str, list[str]] = {}
90+
for cmd in cmds:
91+
base = cmd.strip().split()[0] if cmd.strip() else cmd
92+
bases.setdefault(base, []).append(cmd)
93+
94+
patterns: list[str] = []
95+
for base, variants in bases.items():
96+
if len(variants) == 1 and variants[0] == base:
97+
patterns.append(base)
98+
else:
99+
patterns.append(f"{base} *")
100+
101+
return sorted(patterns)
102+
103+
104+
# ---------------------------------------------------------------------------
105+
# Observation pass
106+
# ---------------------------------------------------------------------------
107+
108+
def _extract_url_host(cmd: str) -> list[str]:
109+
"""Extract hostnames from URLs in a shell command."""
110+
import re
111+
hosts = []
112+
for m in re.finditer(r"https?://([^/\s\"']+)", cmd):
113+
host = m.group(1).split(":")[0]
114+
hosts.append(host)
115+
return hosts
116+
117+
118+
def observe_session(store: TraceStore, session_id: str) -> dict:
119+
"""Return raw observations from a single session."""
120+
events = store.load_events(session_id)
121+
files_read: list[str] = []
122+
files_written: list[str] = []
123+
commands: list[str] = []
124+
network_hosts: list[str] = []
125+
126+
for event in events:
127+
if event.event_type != EventType.TOOL_CALL:
128+
continue
129+
name = event.data.get("tool_name", "").lower()
130+
args = event.data.get("arguments", {}) or {}
131+
132+
if name in ("read", "view", "grep", "glob"):
133+
path = str(
134+
args.get("file_path") or args.get("path") or args.get("pattern") or ""
135+
).strip()
136+
if path and not path.startswith("/proc") and not path.startswith("/sys"):
137+
files_read.append(path)
138+
139+
elif name in ("write", "edit", "create"):
140+
path = str(args.get("file_path") or args.get("path") or "").strip()
141+
if path:
142+
files_written.append(path)
143+
144+
elif name == "bash":
145+
cmd = str(args.get("command", "")).strip()
146+
if cmd:
147+
commands.append(cmd)
148+
network_hosts.extend(_extract_url_host(cmd))
149+
150+
return {
151+
"files_read": files_read,
152+
"files_written": files_written,
153+
"commands": commands,
154+
"network_hosts": network_hosts,
155+
}
156+
157+
158+
# ---------------------------------------------------------------------------
159+
# Public API
160+
# ---------------------------------------------------------------------------
161+
162+
def suggest_policy(
163+
store: TraceStore,
164+
session_ids: list[str],
165+
) -> PolicySuggestion:
166+
"""Analyse *session_ids* and return a PolicySuggestion."""
167+
all_reads: list[str] = []
168+
all_writes: list[str] = []
169+
all_cmds: list[str] = []
170+
all_hosts: list[str] = []
171+
172+
for sid in session_ids:
173+
obs = observe_session(store, sid)
174+
all_reads.extend(obs["files_read"])
175+
all_writes.extend(obs["files_written"])
176+
all_cmds.extend(obs["commands"])
177+
all_hosts.extend(obs["network_hosts"])
178+
179+
# Deduplicate preserving order
180+
def _dedup(lst: list[str]) -> list[str]:
181+
seen: set[str] = set()
182+
out: list[str] = []
183+
for x in lst:
184+
if x not in seen:
185+
seen.add(x)
186+
out.append(x)
187+
return out
188+
189+
reads = _dedup(all_reads)
190+
writes = _dedup(all_writes)
191+
cmds = _dedup(all_cmds)
192+
hosts = _dedup(all_hosts)
193+
194+
return PolicySuggestion(
195+
session_ids=session_ids,
196+
files_read=reads,
197+
files_written=writes,
198+
commands=cmds,
199+
network_hosts=hosts,
200+
file_read_patterns=_collapse_paths(reads),
201+
file_write_patterns=_collapse_paths(writes),
202+
cmd_patterns=_collapse_commands(cmds),
203+
)
204+
205+
206+
def render_policy_json(suggestion: PolicySuggestion) -> dict:
207+
"""Convert a PolicySuggestion to the .agent-scope.json dict format."""
208+
policy: dict = {}
209+
210+
files: dict = {}
211+
if suggestion.file_read_patterns:
212+
files["read"] = {"allow": suggestion.file_read_patterns}
213+
if suggestion.file_write_patterns:
214+
files["write"] = {"allow": suggestion.file_write_patterns}
215+
if files:
216+
policy["files"] = files
217+
218+
if suggestion.cmd_patterns:
219+
policy["commands"] = {"allow": suggestion.cmd_patterns}
220+
221+
if suggestion.network_hosts:
222+
policy["network"] = {
223+
"deny_all": True,
224+
"allow": sorted(set(suggestion.network_hosts)),
225+
}
226+
227+
return policy
228+
229+
230+
# ---------------------------------------------------------------------------
231+
# Formatting
232+
# ---------------------------------------------------------------------------
233+
234+
def format_suggestion(
235+
suggestion: PolicySuggestion,
236+
out: TextIO = sys.stdout,
237+
dry_run: bool = False,
238+
) -> None:
239+
w = out.write
240+
n = len(suggestion.session_ids)
241+
w(f"\nPolicy suggestion from {n} session{'s' if n != 1 else ''}:\n\n")
242+
243+
if suggestion.file_read_patterns:
244+
w(" Files read (allow):\n")
245+
for p in suggestion.file_read_patterns:
246+
w(f" {p}\n")
247+
w("\n")
248+
249+
if suggestion.file_write_patterns:
250+
w(" Files written (allow):\n")
251+
for p in suggestion.file_write_patterns:
252+
w(f" {p}\n")
253+
w("\n")
254+
255+
if suggestion.cmd_patterns:
256+
w(" Commands (allow):\n")
257+
for p in suggestion.cmd_patterns:
258+
w(f" {p}\n")
259+
w("\n")
260+
261+
if suggestion.network_hosts:
262+
w(" Network hosts (allow):\n")
263+
for h in sorted(set(suggestion.network_hosts)):
264+
w(f" {h}\n")
265+
w("\n")
266+
267+
if dry_run:
268+
w("Generated policy (dry run):\n\n")
269+
w(json.dumps(render_policy_json(suggestion), indent=2))
270+
w("\n\n")
271+
272+
273+
# ---------------------------------------------------------------------------
274+
# CLI handler
275+
# ---------------------------------------------------------------------------
276+
277+
def cmd_policy(args: argparse.Namespace) -> int:
278+
store = TraceStore(args.trace_dir)
279+
280+
session_ids_raw: list[str] = getattr(args, "session_ids", []) or []
281+
282+
# If no sessions specified, use all sessions
283+
if not session_ids_raw:
284+
all_sessions = store.list_sessions()
285+
if not all_sessions:
286+
sys.stderr.write("No sessions found.\n")
287+
return 1
288+
session_ids_raw = [s.session_id for s in all_sessions]
289+
290+
# Default: dry-run when no --output given
291+
if not output_path and not dry_run:
292+
dry_run = True
293+
294+
# Resolve prefixes
295+
resolved: list[str] = []
296+
for sid in session_ids_raw:
297+
full = store.find_session(sid)
298+
if not full:
299+
sys.stderr.write(f"Session not found: {sid}\n")
300+
return 1
301+
resolved.append(full)
302+
303+
suggestion = suggest_policy(store, resolved)
304+
305+
output_path = getattr(args, "output", None)
306+
dry_run = getattr(args, "dry_run", False)
307+
308+
if dry_run or not output_path:
309+
format_suggestion(suggestion, dry_run=True)
310+
return 0
311+
312+
policy_dict = render_policy_json(suggestion)
313+
out_path = Path(output_path)
314+
out_path.write_text(json.dumps(policy_dict, indent=2) + "\n")
315+
sys.stdout.write(f"Policy written to {out_path}\n")
316+
return 0

0 commit comments

Comments
 (0)