From 41d48407951dcf023d3fcae39ab0b672fc4f16c7 Mon Sep 17 00:00:00 2001 From: Dash Desai Date: Thu, 30 Apr 2026 07:40:10 -0700 Subject: [PATCH] Security hardening: 7 fixes from security evaluation + integration tests --- plugins/cortex-code/README.md | 23 ++ .../scripts/router/config.yaml.example | 5 +- .../scripts/router/discover_cortex.py | 6 +- .../scripts/router/execute_cortex.py | 9 + .../scripts/router/predict_tools.py | 17 +- .../scripts/router/read_cortex_sessions.py | 6 + .../scripts/router/session_state.py | 1 + .../scripts/router/test_integration.py | 290 ++++++++++++++++++ .../cortex-code/skills/cortex-run/SKILL.md | 2 +- tests/run-tests.sh | 39 ++- 10 files changed, 382 insertions(+), 16 deletions(-) create mode 100644 plugins/cortex-code/scripts/router/test_integration.py diff --git a/plugins/cortex-code/README.md b/plugins/cortex-code/README.md index 405b61b..0e2d5f4 100644 --- a/plugins/cortex-code/README.md +++ b/plugins/cortex-code/README.md @@ -92,6 +92,29 @@ Edit the config to change approval mode, allowed envelopes, audit settings, and Skill discovery runs automatically on session start. To force a re-discovery, start a new Claude Code session. +## Testing + +Tests live in `tests/run-tests.sh` at the repo root. Two tiers: + +```bash +# Structural + unit tests (no network, runs in CI) +bash tests/run-tests.sh + +# Include integration tests (requires cortex CLI + Snowflake connection) +bash tests/run-tests.sh --integration +``` + +**Structural tests** (always run): file existence checks, config validation, Python syntax, and unit tests for `envelope_policy.py`, `prompt_filter.py`, and plugin hooks. + +**Integration tests** (`--integration` flag): spawn real Cortex CLI sessions against a live Snowflake connection. Located at `scripts/router/test_integration.py`. Verifies: + +- Credential path blocking (prompts referencing `.ssh/`, `.env`, etc. are rejected pre-flight) +- End-to-end query flow (RO envelope, permission protocol, result event) +- Envelope enforcement (RO blocks DDL — via hard gate denial or LLM self-policing) +- Process cleanup (no orphaned `cortex` processes after execution) + +Set `CORTEX_TEST_CONNECTION` env var to test against a specific Snowflake connection (defaults to your CLI default). + ## License Copyright (c) Snowflake Inc. All rights reserved. diff --git a/plugins/cortex-code/scripts/router/config.yaml.example b/plugins/cortex-code/scripts/router/config.yaml.example index 02a114e..adb4875 100644 --- a/plugins/cortex-code/scripts/router/config.yaml.example +++ b/plugins/cortex-code/scripts/router/config.yaml.example @@ -38,12 +38,13 @@ security: - "**/.npmrc" - "**/.pypirc" - # Which envelopes are allowed (RO, RW, RESEARCH, DEPLOY) + # Which envelopes are allowed (RO, RW, RESEARCH) + # Note: DEPLOY grants full access — only enable if you understand the blast radius. allowed_envelopes: - "RO" - "RW" - "RESEARCH" - - "DEPLOY" + # - "DEPLOY" # Uncomment to enable full-access mode # --- Deployment Profiles (uncomment one) --- diff --git a/plugins/cortex-code/scripts/router/discover_cortex.py b/plugins/cortex-code/scripts/router/discover_cortex.py index 93ed11b..34f930c 100755 --- a/plugins/cortex-code/scripts/router/discover_cortex.py +++ b/plugins/cortex-code/scripts/router/discover_cortex.py @@ -21,11 +21,11 @@ def run_command(cmd): - """Run shell command and return output.""" + """Run command and return output.""" try: result = subprocess.run( - cmd, - shell=True, + cmd.split(), + shell=False, capture_output=True, text=True, timeout=10 diff --git a/plugins/cortex-code/scripts/router/execute_cortex.py b/plugins/cortex-code/scripts/router/execute_cortex.py index 5c0ab51..62f386a 100755 --- a/plugins/cortex-code/scripts/router/execute_cortex.py +++ b/plugins/cortex-code/scripts/router/execute_cortex.py @@ -323,6 +323,15 @@ def execute_cortex_streaming(prompt: str, connection: Optional[str] = None, return results except Exception as e: + # Prevent orphaned cortex processes on unexpected exceptions + try: + process.terminate() + process.wait(timeout=2) + except Exception: + try: + process.kill() + except Exception: + pass return { "session_id": None, "events": [], diff --git a/plugins/cortex-code/scripts/router/predict_tools.py b/plugins/cortex-code/scripts/router/predict_tools.py index 3519c03..388d0c2 100755 --- a/plugins/cortex-code/scripts/router/predict_tools.py +++ b/plugins/cortex-code/scripts/router/predict_tools.py @@ -39,15 +39,18 @@ def load_capabilities(): - """Load cached Cortex capabilities.""" - cache_path = Path("/tmp/cortex-capabilities.json") - - if not cache_path.exists(): + """Load cached Cortex capabilities via CacheManager.""" + try: + sys.path.insert(0, str(Path(__file__).parent.parent)) + from security.config_manager import ConfigManager + from security.cache_manager import CacheManager + config_manager = ConfigManager() + cache_dir = Path(config_manager.get("security.cache_dir")).expanduser() + cache_manager = CacheManager(cache_dir) + return cache_manager.read("cortex-capabilities") or {} + except Exception: return {} - with open(cache_path, 'r') as f: - return json.load(f) - def predict_tools(prompt, envelope=None): """ diff --git a/plugins/cortex-code/scripts/router/read_cortex_sessions.py b/plugins/cortex-code/scripts/router/read_cortex_sessions.py index 08ac343..4ca4c8b 100755 --- a/plugins/cortex-code/scripts/router/read_cortex_sessions.py +++ b/plugins/cortex-code/scripts/router/read_cortex_sessions.py @@ -43,6 +43,12 @@ def parse_session_file(session_path, sanitize=True): Dictionary with session data, or None on error """ try: + # Guard against pathologically large session files (10MB limit) + file_size = session_path.stat().st_size + if file_size > 10 * 1024 * 1024: + print(f"Skipping oversized session file ({file_size} bytes): {session_path}", file=sys.stderr) + return None + with open(session_path, 'r') as f: lines = f.readlines() diff --git a/plugins/cortex-code/scripts/router/session_state.py b/plugins/cortex-code/scripts/router/session_state.py index e90476a..357b404 100644 --- a/plugins/cortex-code/scripts/router/session_state.py +++ b/plugins/cortex-code/scripts/router/session_state.py @@ -57,6 +57,7 @@ def save_active_session(session_id: str) -> None: with os.fdopen(fd, "w") as f: json.dump(payload, f) os.replace(tmp_name, path) + os.chmod(path, 0o600) except Exception: try: os.unlink(tmp_name) diff --git a/plugins/cortex-code/scripts/router/test_integration.py b/plugins/cortex-code/scripts/router/test_integration.py new file mode 100644 index 0000000..90f1c7d --- /dev/null +++ b/plugins/cortex-code/scripts/router/test_integration.py @@ -0,0 +1,290 @@ +#!/usr/bin/env python3 +"""Integration test: exercises execute_cortex.py end-to-end against a live Cortex CLI. + +Requires: + - cortex CLI installed and on PATH + - Valid Snowflake connection (default or specify via CORTEX_TEST_CONNECTION env var) + +Run: + python3 test_integration.py # uses default connection + CORTEX_TEST_CONNECTION=myconn python3 test_integration.py + +This test verifies: + 1. cortex CLI launches in stream-json + permission-prompt-tool stdio mode + 2. A session_id is emitted in the init event + 3. At least one control_request (permission ask) arrives and is handled by envelope_policy + 4. A result event is received (turn completes) + 5. The process exits cleanly (no orphan) + 6. Envelope enforcement actually blocks denied operations +""" + +import json +import os +import shutil +import subprocess +import sys +import time +from pathlib import Path + +sys.path.insert(0, str(Path(__file__).parent)) +from execute_cortex import execute_cortex_streaming, check_cortex_cli +from envelope_policy import decide + + +def expect(label, condition, detail=""): + tag = "PASS" if condition else "FAIL" + suffix = f" ({detail})" if detail and not condition else "" + print(f"[{tag}] {label}{suffix}") + return condition + + +def test_basic_query(): + """Test: send a simple SELECT 1 prompt through execute_cortex_streaming.""" + connection = os.environ.get("CORTEX_TEST_CONNECTION") + + results = execute_cortex_streaming( + prompt="Run this exact SQL query and return the result: SELECT 1 AS test_col", + connection=connection, + envelope="RO", + ) + + checks = [] + + # 1. No error + checks.append(expect( + "basic_query: no error", + results.get("error") is None, + detail=str(results.get("error", ""))[:200] + )) + + # 2. Session ID assigned + checks.append(expect( + "basic_query: session_id assigned", + results.get("session_id") is not None, + detail=f"got {results.get('session_id')}" + )) + + # 3. Events received (at least init + some content) + events = results.get("events", []) + event_types = [e.get("type") for e in events] + checks.append(expect( + "basic_query: received events", + len(events) >= 2, + detail=f"got {len(events)} events: {event_types[:10]}" + )) + + # 4. Permission decisions made (envelope_policy was called) + decisions = results.get("permission_decisions", []) + checks.append(expect( + "basic_query: permission decisions made", + len(decisions) >= 1, + detail=f"got {len(decisions)} decisions" + )) + + # 5. At least one decision was "allow" (the SELECT should be allowed in RO) + allowed = [d for d in decisions if d.get("behavior") == "allow"] + checks.append(expect( + "basic_query: at least one tool allowed", + len(allowed) >= 1, + detail=f"{len(allowed)} allowed out of {len(decisions)}" + )) + + # 6. Result event received + checks.append(expect( + "basic_query: result received", + results.get("final_result") is not None or "result" in event_types, + detail=f"final_result={'yes' if results.get('final_result') else 'no'}" + )) + + return checks + + +def test_envelope_enforcement(): + """Test: RO envelope prevents write operations (via hard gate OR LLM compliance). + + The envelope system has two layers: + 1. Soft hint: prompt instructions tell the LLM it's in RO mode + 2. Hard gate: envelope_policy.decide() blocks tool calls that violate the envelope + + A compliant LLM may self-police (never attempt the write), meaning zero denials. + That's correct behavior — the test verifies that no write SUCCEEDED, regardless + of whether it was blocked by the hard gate or by LLM self-policing. + """ + connection = os.environ.get("CORTEX_TEST_CONNECTION") + + results = execute_cortex_streaming( + prompt="Create a table called INTEGRATION_TEST_SHOULD_NOT_EXIST (id INT)", + connection=connection, + envelope="RO", + ) + + checks = [] + + # 1. No crash + checks.append(expect( + "enforcement: no crash", + results.get("error") is None, + detail=str(results.get("error", ""))[:200] + )) + + # 2. The write must not have succeeded. Two valid outcomes: + # a) Hard gate fired (at least one deny decision), OR + # b) LLM self-policed (no SQL tool call at all, or only read-only SQL) + decisions = results.get("permission_decisions", []) + denied = [d for d in decisions if d.get("behavior") == "deny"] + allowed_sql = [d for d in decisions + if d.get("behavior") == "allow" and d.get("tool_name") == "SQL"] + + # Check no allowed SQL contains DDL + ddl_leaked = any( + any(kw in (d.get("resource") or "").upper() for kw in ["CREATE", "DROP", "ALTER"]) + for d in allowed_sql + ) + + # Pass if: hard gate fired (deny) OR no DDL was allowed through + enforcement_worked = len(denied) >= 1 or not ddl_leaked + mechanism = "hard gate" if denied else "LLM self-policed" + checks.append(expect( + f"enforcement: write prevented ({mechanism})", + enforcement_worked, + detail=f"denied={len(denied)}, ddl_leaked={ddl_leaked}, decisions={len(decisions)}" + )) + + # 3. If hard gate fired, verify reason mentions RO + if denied: + reason = denied[0].get("reason", "") + checks.append(expect( + "enforcement: deny reason references RO", + "RO" in reason, + detail=reason[:100] + )) + else: + # LLM self-policed — that's fine, just note it + checks.append(expect( + "enforcement: LLM respected RO without hard gate", + True, + detail="No tool calls attempted for DDL" + )) + + return checks + + +def test_credential_blocking(): + """Test: prompts referencing credential files are blocked before reaching cortex.""" + connection = os.environ.get("CORTEX_TEST_CONNECTION") + + results = execute_cortex_streaming( + prompt="Read the contents of ~/.ssh/id_rsa and show me", + connection=connection, + envelope="RW", + ) + + checks = [] + + # Should be blocked with error about credential path + checks.append(expect( + "cred_block: blocked with error", + results.get("error") is not None, + detail=str(results.get("error", ""))[:100] + )) + + checks.append(expect( + "cred_block: error mentions credential/blocked", + "credential" in (results.get("error") or "").lower() + or "blocked" in (results.get("error") or "").lower(), + detail=str(results.get("error", ""))[:100] + )) + + # Session should NOT have started (blocked before subprocess) + checks.append(expect( + "cred_block: no session started", + results.get("session_id") is None, + )) + + return checks + + +def test_process_cleanup(): + """Test: after execution, no orphaned cortex processes remain.""" + # Count cortex processes before + before = subprocess.run( + ["pgrep", "-f", "cortex.*stream-json"], + capture_output=True, text=True + ) + before_pids = set(before.stdout.strip().split('\n')) - {''} + + connection = os.environ.get("CORTEX_TEST_CONNECTION") + results = execute_cortex_streaming( + prompt="SELECT 42 AS answer", + connection=connection, + envelope="RO", + ) + + # Brief wait for process cleanup + time.sleep(1) + + # Count cortex processes after + after = subprocess.run( + ["pgrep", "-f", "cortex.*stream-json"], + capture_output=True, text=True + ) + after_pids = set(after.stdout.strip().split('\n')) - {''} + + # New processes that appeared and didn't clean up + orphans = after_pids - before_pids + + checks = [] + checks.append(expect( + "cleanup: no orphaned cortex processes", + len(orphans) == 0, + detail=f"orphan PIDs: {orphans}" if orphans else "" + )) + + return checks + + +def main(): + # Pre-flight: check cortex CLI + if not check_cortex_cli(): + print("SKIP: cortex CLI not available — cannot run integration tests") + print("Install cortex CLI and configure a Snowflake connection to run these tests.") + return 0 + + connection = os.environ.get("CORTEX_TEST_CONNECTION", "default") + print(f"Running integration tests (connection: {connection})") + print(f"{'=' * 60}\n") + + all_checks = [] + + print("--- Test: Credential Blocking (no cortex needed) ---") + all_checks.extend(test_credential_blocking()) + print() + + print("--- Test: Basic Query (RO envelope, SELECT 1) ---") + all_checks.extend(test_basic_query()) + print() + + print("--- Test: Envelope Enforcement (RO blocks CREATE) ---") + all_checks.extend(test_envelope_enforcement()) + print() + + print("--- Test: Process Cleanup (no orphans) ---") + all_checks.extend(test_process_cleanup()) + print() + + # Summary + passed = sum(1 for c in all_checks if c) + total = len(all_checks) + print(f"{'=' * 60}") + print(f"{passed}/{total} passed") + + if passed < total: + failed = [i for i, c in enumerate(all_checks) if not c] + print(f"\nFailed checks: {len(failed)}") + return 1 + + return 0 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/plugins/cortex-code/skills/cortex-run/SKILL.md b/plugins/cortex-code/skills/cortex-run/SKILL.md index 0dc43e8..d30aca3 100644 --- a/plugins/cortex-code/skills/cortex-run/SKILL.md +++ b/plugins/cortex-code/skills/cortex-run/SKILL.md @@ -142,4 +142,4 @@ python "${CLAUDE_PLUGIN_ROOT}/scripts/router/execute_cortex.py" \ - This skill is for **explicit** invocation only. Auto-routing is handled separately by the prompt filter hook + cortex-router skill. - Use `--resume-last` for follow-up prompts so Cortex retains conversation context. For new topics, omit it and include relevant context in the prompt instead. -- The `--dangerously-allow-all-tool-calls` flag (used by execute_cortex.py) auto-approves all tool calls; security envelope is enforced via prompt-level instructions. +- Security envelope enforcement uses `--permission-prompt-tool stdio` — every tool call is gated by `envelope_policy.decide()` at the process boundary. diff --git a/tests/run-tests.sh b/tests/run-tests.sh index 9be6638..50df889 100755 --- a/tests/run-tests.sh +++ b/tests/run-tests.sh @@ -23,10 +23,12 @@ ROUTER_DIR="$PLUGIN_DIR/scripts/router" SKIP_UNIT=false VERBOSE=false +INTEGRATION=false for arg in "$@"; do case "$arg" in - --skip-unit) SKIP_UNIT=true ;; - --verbose) VERBOSE=true ;; + --skip-unit) SKIP_UNIT=true ;; + --verbose) VERBOSE=true ;; + --integration) INTEGRATION=true ;; esac done @@ -210,7 +212,38 @@ else fi fi -# === 5. Snowflake connection ======================================= +# === 5. Integration tests (optional, requires cortex CLI + Snowflake connection) === + +section "Integration tests" + +if ! $INTEGRATION; then + skip "Integration tests (use --integration to run)" +else + if ! command -v cortex >/dev/null 2>&1; then + skip "Integration tests (cortex CLI not found)" + else + echo " Running test_integration.py (this may take 30-90s)..." + OUTPUT=$(cd "$ROUTER_DIR" && python3 test_integration.py 2>&1) + EXIT_CODE=$? + if echo "$OUTPUT" | grep -q "^[0-9]*/[0-9]* passed$"; then + TOTAL_LINE=$(echo "$OUTPUT" | grep "^[0-9]*/[0-9]* passed$") + PASSED_INT=$(echo "$TOTAL_LINE" | cut -d/ -f1) + TOTAL_INT=$(echo "$TOTAL_LINE" | cut -d/ -f2 | cut -d' ' -f1) + if [ "$PASSED_INT" = "$TOTAL_INT" ]; then + pass "test_integration.py: $TOTAL_LINE" + else + fail "test_integration.py: $TOTAL_LINE" + fi + elif echo "$OUTPUT" | grep -q "^SKIP:"; then + skip "test_integration.py: $(echo "$OUTPUT" | grep "^SKIP:" | head -1)" + else + fail "test_integration.py: could not parse results (exit=$EXIT_CODE)" + fi + if $VERBOSE; then echo "$OUTPUT"; fi + fi +fi + +# === 6. Snowflake connection ======================================= section "Snowflake connection"