Skip to content

Commit 2bd13a4

Browse files
Blanquitohclaude
andcommitted
feat(enforce-lsp-over-grep): telemetry log + regex hardening + quoted-pattern FP fix
- _log_block() disk-only JSONL telemetry with 256KB rotation + secret redaction - POS_CODE_FILE_RE trailing boundary includes |>;& for pipe/redirect forms - _strip_quoted() preprocessor prevents FP on code ext inside quoted pattern - --selftest CLI flag with 8 regex cases - strip user-specific mcp__ollama-filter__ollama_filter_call branch (MCP-agnostic repo; user-scope branch lives in ~/.claude/hooks/) - tests: +_log_block (3), escape-quote ANSI-C/locale (2), rotation (2) Co-authored-by: Claude <noreply@anthropic.com>
1 parent c89ab19 commit 2bd13a4

2 files changed

Lines changed: 175 additions & 66 deletions

File tree

hooks/enforce-lsp-over-grep.py

Lines changed: 68 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,36 @@
1818
HOME = Path(os.environ.get("HOME", str(Path.home())))
1919
AVAIL_FILE = HOME / ".claude" / "locks" / "lsp-availability.json"
2020
PLUGINS_FILE = HOME / ".claude" / "plugins" / "installed_plugins.json"
21+
METRICS_LOG = HOME / ".claude" / ".metrics" / "lsp-grep-blocks.log"
22+
23+
24+
def _log_block(payload: dict, tool_name: str, pattern_excerpt: str, reason: str) -> None:
25+
"""append single jsonl entry to metrics log; silent-pass on any failure.
26+
disk-only — never emits to stdout/stderr/additionalContext (zero-token invariant)."""
27+
try:
28+
from datetime import datetime, timezone
29+
# rotate when oversized: rename to .log.1 (overwriting any prior backup), start fresh
30+
try:
31+
if METRICS_LOG.exists() and METRICS_LOG.stat().st_size > 256 * 1024:
32+
os.replace(str(METRICS_LOG), str(METRICS_LOG) + ".1")
33+
except Exception:
34+
pass
35+
excerpt = (pattern_excerpt or "")[:80]
36+
# redact secret-looking content (long hex/base64 or key= tokens)
37+
if re.search(r"(?i)(api[_-]?key|secret|token|password|bearer)\s*[:=]", excerpt) or re.search(r"[A-Za-z0-9+/=]{40,}", excerpt):
38+
excerpt = "[REDACTED]"
39+
entry = {
40+
"ts": datetime.now(timezone.utc).isoformat(),
41+
"session_id": payload.get("session_id", ""),
42+
"tool_name": tool_name,
43+
"pattern_excerpt": excerpt,
44+
"reason": reason,
45+
}
46+
METRICS_LOG.parent.mkdir(parents=True, exist_ok=True)
47+
with open(METRICS_LOG, "a") as f:
48+
f.write(json.dumps(entry) + "\n")
49+
except Exception:
50+
pass
2151

2252
# plugin + binary fallback when avail file lacks an entry (prewarm hasn't run for this cwd yet)
2353
PLUGIN_BINARY_MAP = {
@@ -132,7 +162,15 @@ def lang_info_fallback(lang: str) -> Optional[dict]:
132162
RG_TYPE_RE = re.compile(r"""--type[=\s](\w+)""")
133163
RG_GLOB_RE = re.compile(r"""-g[=\s]['"]*\*(\.\w+)['"]*""")
134164
# positional code-file argument to grep/rg: "grep pattern path/to/file.scala"
135-
POS_CODE_FILE_RE = re.compile(r"""(?:^|\s)['"]?[^\s'"|&;<>]*(\.(?:scala|sbt|sc|py|ts|tsx|js|jsx|cs|vue|java))['"]?(?:\s|$)""")
165+
# trailing boundary includes pipe/redirect/semicolon/ampersand to catch
166+
# `grep "x" /a/b.ts | head` and `grep "x" /a/b.ts > out` and `grep "x" /a/b.ts;echo`
167+
POS_CODE_FILE_RE = re.compile(r"""(?:^|\s)['"]?[^\s'"|&;<>]*(\.(?:scala|sbt|sc|py|ts|tsx|js|jsx|cs|vue|java))['"]?(?:\s|$|[|>;&])""")
168+
_QUOTED_RE = re.compile(r"""(?:"(?:\\.|[^"\\])*"|'(?:\\.|[^'\\])*'|\$'(?:\\.|[^'\\])*'|\$"(?:\\.|[^"\\])*")""")
169+
170+
171+
def _strip_quoted(cmd: str) -> str:
172+
"""remove shell-quoted substrings before POS_CODE_FILE_RE match — avoids FP on `grep "foo.ts" /a/b.md` where `.ts` is inside the search pattern, not a target filename"""
173+
return _QUOTED_RE.sub(" ", cmd)
136174

137175

138176
def lsp_suggestion(lang: str, avail: dict) -> Optional[str]:
@@ -234,7 +272,7 @@ def detect_langs(cmd: str) -> set:
234272
langs.add(EXT_LANG[ext])
235273
# positional code-file argument (e.g. `grep foo path/to/file.scala`)
236274
if tool_head in {"grep", "egrep", "fgrep", "rg", "ack", "ag"}:
237-
for m in POS_CODE_FILE_RE.finditer(cmd):
275+
for m in POS_CODE_FILE_RE.finditer(_strip_quoted(cmd)):
238276
ext = m.group(1).lower()
239277
if ext in EXT_LANG:
240278
langs.add(EXT_LANG[ext])
@@ -329,30 +367,13 @@ def main() -> None:
329367
" - targeting non-source files → add --include='*.<ext>' (e.g. *.sql, *.md, *.properties) or -g '*.<ext>' for rg\n"
330368
" - scoped to a known non-code subtree → recurse inside conf/, docs/, .claude/, i18n/, etc. explicitly\n"
331369
)
370+
_log_block(payload, tool_name, cmd, "unscoped_recursive_grep_bash")
332371
sys.exit(2)
333372
langs = detect_langs(cmd)
334373
source = "Bash: " + cmd[:200] + ("..." if len(cmd) > 200 else "")
335374
elif tool_name == "Grep":
336375
langs = detect_langs_native_grep(inp)
337376
source = f"Grep(pattern={inp.get('pattern','')[:40]}, type={inp.get('type')}, glob={inp.get('glob')}, path={inp.get('path')})"
338-
elif tool_name == "mcp__ollama-filter__ollama_filter_call":
339-
# filter_bash bypass: catches grep/rg/find wrapped in the ollama compression layer
340-
sub_tool = (inp.get("tool") or "").strip()
341-
sub_args = inp.get("args") or {}
342-
if sub_tool != "filter_bash":
343-
sys.exit(0)
344-
cmd = sub_args.get("command", "") if isinstance(sub_args, dict) else ""
345-
if not cmd or not is_search_tool(cmd):
346-
sys.exit(0)
347-
if is_unscoped_recursive_grep(cmd):
348-
sys.stderr.write(
349-
"BLOCKED by enforce-lsp-over-grep: unscoped recursive grep/rg (via ollama_filter_call)\n"
350-
f"command: {cmd[:300]}{'...' if len(cmd) > 300 else ''}\n"
351-
"declare intent: <lang>-direct wrapper for source code, or --include='*.<ext>' for non-source.\n"
352-
)
353-
sys.exit(2)
354-
langs = detect_langs(cmd)
355-
source = "ollama_filter_call(filter_bash): " + cmd[:200] + ("..." if len(cmd) > 200 else "")
356377
else:
357378
sys.exit(0)
358379
if not langs:
@@ -381,8 +402,35 @@ def main() -> None:
381402
f"{source}\n\n"
382403
)
383404
sys.stderr.write(header + "\n\n".join(msgs) + "\n")
405+
if block:
406+
_log_block(payload, tool_name, source, f"lsp_available_langs:{','.join(sorted(langs))}")
384407
sys.exit(2 if block else 0)
385408

386409

410+
def _selftest() -> int:
411+
"""inline regex self-test for POS_CODE_FILE_RE pipe/redirect boundary fix.
412+
run: `python3 enforce-lsp-over-grep.py --selftest`"""
413+
cases = [
414+
('grep "x" /a/b.ts', True), # plain positional code file
415+
('grep "x" /a/b.ts | head -20', True), # pipe boundary
416+
('grep "x" /a/b.ts > out', True), # redirect boundary
417+
('grep "x" /a/b.md', False), # non-source extension
418+
('grep "foo.ts" /a/b.md', False), # code ext INSIDE quoted pattern, target is .md
419+
('grep "pattern.ts;literal" file.md', False), # ext + separator inside quoted pattern
420+
('grep "x" /a/b.ts;echo done', True), # real .ts file then ;echo separator
421+
("grep 'foo.py' /a/b.md", False), # single-quoted pattern variant
422+
]
423+
failed = 0
424+
for cmd, expected_block in cases:
425+
matched = bool(POS_CODE_FILE_RE.search(_strip_quoted(cmd)))
426+
ok = matched == expected_block
427+
print(f"{'OK' if ok else 'FAIL'}: {cmd!r} matched={matched} expected={expected_block}")
428+
if not ok:
429+
failed += 1
430+
return 0 if failed == 0 else 1
431+
432+
387433
if __name__ == "__main__":
434+
if len(sys.argv) > 1 and sys.argv[1] == "--selftest":
435+
sys.exit(_selftest())
388436
main()

hooks/tests/test_enforce_lsp_over_grep.py

Lines changed: 107 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -291,52 +291,6 @@ def test_bash_positional_non_code_passes(fake_home):
291291
assert rc == 0
292292

293293

294-
# ---------- ollama-filter filter_bash bypass ----------
295-
296-
297-
def _ollama(sub_tool: str, **args) -> dict:
298-
return {
299-
"hook_event_name": "PreToolUse",
300-
"tool_name": "mcp__ollama-filter__ollama_filter_call",
301-
"tool_input": {"tool": sub_tool, "args": args},
302-
}
303-
304-
305-
@pytest.mark.parametrize("cmd,lang", [
306-
('grep -rn foo ~/x --include="*.scala"', "scala"),
307-
('rg --type python bar ~/x', "python"),
308-
('find ~/x -name "*.ts"', "typescript"),
309-
('grep -n foo /tmp/Foo.cs', "csharp"),
310-
])
311-
def test_ollama_filter_bash_bypass_blocked(fake_home, cmd, lang):
312-
_write_availability(fake_home, {"lsps": {
313-
"scala": {"tool":"metals-direct","binary":"/x","backend":"metals-mcp","workspace":"/w"},
314-
"python": {"tool":"claude-lsp","plugin_installed":True,"binary_on_path":True,"binary_name":"pyright-langserver","workspace":"/w"},
315-
"typescript": {"tool":"claude-lsp","plugin_installed":True,"binary_on_path":True,"binary_name":"typescript-language-server","workspace":"/w"},
316-
"csharp": {"tool":"claude-lsp","plugin_installed":True,"binary_on_path":True,"binary_name":"csharp-ls","workspace":"/w"},
317-
}})
318-
rc, _, err = _run(_ollama("filter_bash", command=cmd), fake_home)
319-
assert rc == 2
320-
assert lang in err
321-
assert "ollama_filter_call" in err
322-
323-
324-
@pytest.mark.parametrize("sub_tool,args", [
325-
("search_memory", {"query": "foo"}),
326-
("search_repo", {"query": "foo", "repo_path": "/x"}),
327-
("filter_read", {"path": "/tmp/Foo.scala"}),
328-
("filter_webfetch", {"url": "https://x/y"}),
329-
("filter_bash", {"command": "docker ps"}),
330-
("filter_bash", {"command": "git log --oneline"}),
331-
])
332-
def test_ollama_filter_non_grep_passes(fake_home, sub_tool, args):
333-
_write_availability(fake_home, {"lsps": {
334-
"scala": {"tool":"metals-direct","binary":"/x","backend":"metals-mcp","workspace":"/w"},
335-
}})
336-
rc, _, _ = _run(_ollama(sub_tool, **args), fake_home)
337-
assert rc == 0
338-
339-
340294
# ---------- non-bash non-grep ----------
341295

342296

@@ -363,5 +317,112 @@ def test_empty_command_passes(fake_home):
363317
assert rc == 0
364318

365319

320+
# ---------- _log_block telemetry ----------
321+
322+
323+
@pytest.fixture
324+
def hook_module():
325+
import importlib.util
326+
spec = importlib.util.spec_from_file_location("enforce_lsp_over_grep", HOOK_PATH)
327+
mod = importlib.util.module_from_spec(spec)
328+
assert spec.loader is not None
329+
spec.loader.exec_module(mod)
330+
return mod
331+
332+
333+
def test_log_block_writes_jsonl_entry(hook_module, tmp_path, monkeypatch):
334+
log_path = tmp_path / "log.jsonl"
335+
monkeypatch.setattr(hook_module, "METRICS_LOG", log_path)
336+
hook_module._log_block({"session_id": "abc"}, "Bash", "grep x /a/b.ts", "positional")
337+
lines = log_path.read_text().splitlines()
338+
assert len(lines) == 1
339+
entry = json.loads(lines[0])
340+
assert "ts" in entry
341+
assert entry["session_id"] == "abc"
342+
assert entry["tool_name"] == "Bash"
343+
assert "grep x" in entry["pattern_excerpt"]
344+
assert entry["reason"] == "positional"
345+
346+
347+
def test_log_block_redacts_secrets(hook_module, tmp_path, monkeypatch):
348+
log_path = tmp_path / "log.jsonl"
349+
monkeypatch.setattr(hook_module, "METRICS_LOG", log_path)
350+
hook_module._log_block({"session_id": "s"}, "Bash", "api_key=AKIA1234", "x")
351+
entry = json.loads(log_path.read_text().splitlines()[0])
352+
assert entry["pattern_excerpt"] == "[REDACTED]"
353+
# 45-char alphanumeric string
354+
log_path.write_text("")
355+
hook_module._log_block({"session_id": "s"}, "Bash", "A" * 45, "x")
356+
entry = json.loads(log_path.read_text().splitlines()[0])
357+
assert entry["pattern_excerpt"] == "[REDACTED]"
358+
359+
360+
def test_log_block_silent_pass_on_io_error(hook_module, monkeypatch):
361+
monkeypatch.setattr(hook_module, "METRICS_LOG", Path("/nonexistent-root-dir/log.jsonl"))
362+
# MUST NOT raise
363+
result = hook_module._log_block({"session_id": "s"}, "Bash", "grep x /a/b.ts", "x")
364+
assert result is None
365+
366+
367+
def test_log_block_rotates_when_oversized(hook_module, tmp_path, monkeypatch):
368+
log_path = tmp_path / "lsp-grep-blocks.log"
369+
backup_path = tmp_path / "lsp-grep-blocks.log.1"
370+
# pre-fill with 260 KB of dummy content
371+
dummy = "x" * (260 * 1024)
372+
log_path.write_text(dummy)
373+
monkeypatch.setattr(hook_module, "METRICS_LOG", log_path)
374+
hook_module._log_block({"session_id": "s"}, "Bash", "grep x /a/b.ts", "rot")
375+
# .log now small + only has new entry
376+
new_contents = log_path.read_text()
377+
assert len(new_contents) < 1024, f"expected small fresh log, got {len(new_contents)} bytes"
378+
assert '"reason": "rot"' in new_contents
379+
# .log.1 exists, holds the previous oversized content
380+
assert backup_path.exists()
381+
assert backup_path.read_text() == dummy
382+
383+
384+
def test_log_block_overwrites_old_rotation(hook_module, tmp_path, monkeypatch):
385+
log_path = tmp_path / "lsp-grep-blocks.log"
386+
backup_path = tmp_path / "lsp-grep-blocks.log.1"
387+
fresh = "y" * (260 * 1024)
388+
stale = "STALE-OLD-BACKUP"
389+
log_path.write_text(fresh)
390+
backup_path.write_text(stale)
391+
monkeypatch.setattr(hook_module, "METRICS_LOG", log_path)
392+
hook_module._log_block({"session_id": "s"}, "Bash", "grep x /a/b.ts", "rot")
393+
# .log.1 now holds fresh (what .log had pre-rotation); stale backup overwritten
394+
assert backup_path.read_text() == fresh
395+
assert "STALE" not in backup_path.read_text()
396+
397+
398+
# ---------- backslash/single-quote escape in positional regex ----------
399+
400+
401+
def test_strip_quoted_handles_backslash_escape_inside_double_quote(hook_module):
402+
cmd = r'''grep "it\"s a .ts" /a/b.md'''
403+
langs = hook_module.detect_langs(cmd)
404+
assert langs == set(), f"expected no lang detection, got {langs}"
405+
406+
407+
def test_strip_quoted_handles_single_quote_literal(hook_module):
408+
cmd = '''grep 'a"b.ts' /a/b.md'''
409+
langs = hook_module.detect_langs(cmd)
410+
assert langs == set(), f"expected no lang detection, got {langs}"
411+
412+
413+
def test_strip_quoted_handles_ansi_c_dollar_single(hook_module):
414+
# bash ANSI-C quoting: $'...' — `.ts` inside pattern, target is .md
415+
cmd = r"""grep $'foo.ts\n' /a/b.md"""
416+
langs = hook_module.detect_langs(cmd)
417+
assert langs == set(), f"expected no lang detection, got {langs}"
418+
419+
420+
def test_strip_quoted_handles_locale_dollar_double(hook_module):
421+
# locale-translation quoting: $"..." — `.ts` inside pattern, target is .md
422+
cmd = '''grep $"x.ts" /a/b.md'''
423+
langs = hook_module.detect_langs(cmd)
424+
assert langs == set(), f"expected no lang detection, got {langs}"
425+
426+
366427
if __name__ == "__main__":
367428
sys.exit(pytest.main([__file__, "-v"]))

0 commit comments

Comments
 (0)