|
| 1 | +#!/usr/bin/env python3 |
| 2 | +""" |
| 3 | +Pre-Tool Semantic Memory Hook: Mid-stream context injection based on thinking blocks. |
| 4 | +
|
| 5 | +This hook implements the game-changing pattern from @josh_ladner: |
| 6 | +1. Extract the last ~1500 chars from the most recent thinking block |
| 7 | +2. Embed this current intent/context |
| 8 | +3. Pull relevant heuristics from vector DB |
| 9 | +4. Inject them synchronously before tool execution |
| 10 | +
|
| 11 | +The result: Self-correcting Claude workflows that stay relevant as context drifts. |
| 12 | +""" |
| 13 | + |
| 14 | +import json |
| 15 | +import sys |
| 16 | +import os |
| 17 | +from pathlib import Path |
| 18 | +from typing import List, Dict, Optional, Any |
| 19 | + |
| 20 | +# Add ELF src to path (works both in dev and installed) |
| 21 | +script_dir = Path(__file__).resolve().parent |
| 22 | +sys.path.insert(0, str(script_dir / ".." / ".." / ".." / "src" / "query")) |
| 23 | + |
| 24 | +# Import semantic search |
| 25 | +try: |
| 26 | + from semantic_search import SemanticSearcher |
| 27 | + SEMANTIC_AVAILABLE = True |
| 28 | +except ImportError: |
| 29 | + SEMANTIC_AVAILABLE = False |
| 30 | + |
| 31 | +# Constants |
| 32 | +THINKING_CHARS = 1500 # Characters to extract from thinking block |
| 33 | + |
| 34 | + |
| 35 | +def get_hook_input() -> dict: |
| 36 | + """Read hook input from stdin.""" |
| 37 | + try: |
| 38 | + return json.load(sys.stdin) |
| 39 | + except (json.JSONDecodeError, IOError, ValueError): |
| 40 | + return {} |
| 41 | + |
| 42 | + |
| 43 | +def read_transcript_thinking(transcript_path: str) -> Optional[str]: |
| 44 | + """ |
| 45 | + Read the most recent thinking block from transcript. |
| 46 | + |
| 47 | + Returns the last ~1500 characters of the most recent thinking content, |
| 48 | + or None if no thinking block found. |
| 49 | + """ |
| 50 | + if not transcript_path or not Path(transcript_path).exists(): |
| 51 | + return None |
| 52 | + |
| 53 | + try: |
| 54 | + thinking_blocks = [] |
| 55 | + |
| 56 | + with open(transcript_path, 'r', encoding='utf-8') as f: |
| 57 | + for line in f: |
| 58 | + line = line.strip() |
| 59 | + if not line: |
| 60 | + continue |
| 61 | + |
| 62 | + try: |
| 63 | + entry = json.loads(line) |
| 64 | + |
| 65 | + # Look for thinking blocks in various formats |
| 66 | + # Format 1: Claude Code's thinking block format |
| 67 | + if entry.get('role') == 'assistant' and entry.get('thinking'): |
| 68 | + thinking_content = entry.get('thinking', '') |
| 69 | + if isinstance(thinking_content, str): |
| 70 | + thinking_blocks.append(thinking_content) |
| 71 | + |
| 72 | + # Format 2: Nested in content |
| 73 | + content = entry.get('content', '') |
| 74 | + if isinstance(content, list): |
| 75 | + for item in content: |
| 76 | + if isinstance(item, dict): |
| 77 | + if item.get('type') == 'thinking': |
| 78 | + thinking_text = item.get('thinking', '') |
| 79 | + if thinking_text: |
| 80 | + thinking_blocks.append(thinking_text) |
| 81 | + # Alternative: thinking in text field |
| 82 | + elif item.get('type') == 'text': |
| 83 | + text = item.get('text', '') |
| 84 | + # Check for explicit thinking markers |
| 85 | + if '<thinking>' in text or 'thinking_block' in text: |
| 86 | + thinking_blocks.append(text) |
| 87 | + |
| 88 | + # Format 3: Direct thinking field at top level |
| 89 | + if entry.get('thinking_block') or entry.get('thinking_content'): |
| 90 | + thinking = entry.get('thinking_block') or entry.get('thinking_content') |
| 91 | + if thinking: |
| 92 | + thinking_blocks.append(str(thinking)) |
| 93 | + |
| 94 | + except json.JSONDecodeError: |
| 95 | + continue |
| 96 | + |
| 97 | + if not thinking_blocks: |
| 98 | + return None |
| 99 | + |
| 100 | + # Get the most recent thinking block |
| 101 | + latest_thinking = thinking_blocks[-1] |
| 102 | + |
| 103 | + # Return last THINKING_CHARS characters |
| 104 | + if len(latest_thinking) > THINKING_CHARS: |
| 105 | + return "..." + latest_thinking[-THINKING_CHARS:] |
| 106 | + return latest_thinking |
| 107 | + |
| 108 | + except Exception as e: |
| 109 | + # Silently fail - don't disrupt workflow |
| 110 | + return None |
| 111 | + |
| 112 | + |
| 113 | +async def get_semantic_heuristics( |
| 114 | + thinking_context: str, |
| 115 | + tool_name: str, |
| 116 | + tool_input: dict |
| 117 | +) -> List[Dict[str, Any]]: |
| 118 | + """ |
| 119 | + Get semantically relevant heuristics based on thinking context. |
| 120 | + |
| 121 | + Uses embedding similarity between current thinking and heuristics DB. |
| 122 | + """ |
| 123 | + if not SEMANTIC_AVAILABLE: |
| 124 | + return [] |
| 125 | + |
| 126 | + try: |
| 127 | + # Initialize semantic searcher |
| 128 | + searcher = await SemanticSearcher.create() |
| 129 | + |
| 130 | + # Create rich query from thinking + tool context |
| 131 | + query = f""" |
| 132 | +{thinking_context} |
| 133 | +
|
| 134 | +Tool: {tool_name} |
| 135 | +Tool Input: {json.dumps(tool_input, default=str)[:500]} |
| 136 | +""".strip() |
| 137 | + |
| 138 | + # Search for relevant heuristics |
| 139 | + results = await searcher.find_relevant_heuristics( |
| 140 | + task=query, |
| 141 | + threshold=0.65, # Slightly lower threshold for mid-stream |
| 142 | + limit=3 # Keep it concise for mid-stream injection |
| 143 | + ) |
| 144 | + |
| 145 | + await searcher.cleanup() |
| 146 | + return results |
| 147 | + |
| 148 | + except Exception as e: |
| 149 | + # Fail silently - don't block workflow |
| 150 | + return [] |
| 151 | + |
| 152 | + |
| 153 | +def format_injection_context(heuristics: List[Dict]) -> str: |
| 154 | + """Format heuristics for injection into Claude's context.""" |
| 155 | + if not heuristics: |
| 156 | + return "" |
| 157 | + |
| 158 | + lines = [ |
| 159 | + "", |
| 160 | + "---", |
| 161 | + "## [Mid-Stream Memory] Relevant Patterns Detected", |
| 162 | + "" |
| 163 | + ] |
| 164 | + |
| 165 | + for h in heuristics: |
| 166 | + rule = h.get('rule', '') |
| 167 | + domain = h.get('domain', 'general') |
| 168 | + confidence = h.get('confidence', 0) * 100 |
| 169 | + is_golden = h.get('is_golden', False) |
| 170 | + |
| 171 | + prefix = "⭐ GOLDEN" if is_golden else f"[{domain}]" |
| 172 | + lines.append(f"- {prefix} {rule} ({confidence:.0f}% confidence)") |
| 173 | + |
| 174 | + lines.extend(["---", ""]) |
| 175 | + return "\n".join(lines) |
| 176 | + |
| 177 | + |
| 178 | +async def main_async(): |
| 179 | + """Async main hook logic.""" |
| 180 | + hook_input = get_hook_input() |
| 181 | + |
| 182 | + tool_name = hook_input.get("tool_name", hook_input.get("tool")) |
| 183 | + tool_input = hook_input.get("tool_input", hook_input.get("input", {})) |
| 184 | + transcript_path = hook_input.get("transcript_path", "") |
| 185 | + |
| 186 | + # Only process investigation/modification tools |
| 187 | + RELEVANT_TOOLS = { |
| 188 | + "Task", "Bash", "Grep", "Read", "Glob", "Edit", "Write", |
| 189 | + "WebFetch", "WebSearch" |
| 190 | + } |
| 191 | + |
| 192 | + is_mcp_tool = tool_name.startswith("mcp__") if tool_name else False |
| 193 | + |
| 194 | + if not tool_name or (tool_name not in RELEVANT_TOOLS and not is_mcp_tool): |
| 195 | + # Not a relevant tool - approve silently |
| 196 | + print(json.dumps({"decision": "approve"})) |
| 197 | + return |
| 198 | + |
| 199 | + # Extract thinking from transcript |
| 200 | + thinking_context = read_transcript_thinking(transcript_path) |
| 201 | + |
| 202 | + if not thinking_context: |
| 203 | + # No thinking block found - fall back to standard behavior |
| 204 | + print(json.dumps({"decision": "approve"})) |
| 205 | + return |
| 206 | + |
| 207 | + # Get semantically relevant heuristics |
| 208 | + heuristics = await get_semantic_heuristics(thinking_context, tool_name, tool_input) |
| 209 | + |
| 210 | + if not heuristics: |
| 211 | + # No relevant heuristics found |
| 212 | + print(json.dumps({"decision": "approve"})) |
| 213 | + return |
| 214 | + |
| 215 | + # Format and inject context |
| 216 | + injection_context = format_injection_context(heuristics) |
| 217 | + |
| 218 | + # Return with additional context injection |
| 219 | + result = { |
| 220 | + "decision": "approve", |
| 221 | + "hookSpecificOutput": { |
| 222 | + "hookEventName": "PreToolUse", |
| 223 | + "additionalContext": injection_context |
| 224 | + } |
| 225 | + } |
| 226 | + |
| 227 | + print(json.dumps(result)) |
| 228 | + |
| 229 | + |
| 230 | +def main(): |
| 231 | + """Sync entry point.""" |
| 232 | + import asyncio |
| 233 | + try: |
| 234 | + asyncio.run(main_async()) |
| 235 | + except Exception as e: |
| 236 | + # Never block - fail open |
| 237 | + print(json.dumps({"decision": "approve"})) |
| 238 | + |
| 239 | + |
| 240 | +if __name__ == "__main__": |
| 241 | + main() |
0 commit comments