Skip to content

Commit a4959a5

Browse files
committed
Address review feedback: hash chain docs, _get_last_hash perf, restore arch comment
1 parent 1ca6095 commit a4959a5

2 files changed

Lines changed: 31 additions & 16 deletions

File tree

plugins/cortex-code/scripts/router/execute_cortex.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,12 @@ def check_cortex_cli() -> bool:
6464
return False
6565

6666

67+
# Prompt-level security envelope instructions.
68+
# Hard enforcement happens through `--permission-prompt-tool stdio`: cortex
69+
# emits a control_request for every tool call and this wrapper replies via
70+
# envelope_policy.decide(). The prompt text below is a soft hint so the LLM
71+
# shapes its plan to the envelope (fewer denied tool calls, cleaner UX). Hard
72+
# gate is the policy function -- the LLM cannot talk its way past it.
6773
ENVELOPE_INSTRUCTIONS = {
6874
"RO": (
6975
"# Security Envelope: READ-ONLY\n"

plugins/cortex-code/scripts/router/security/audit_logger.py

Lines changed: 25 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -81,34 +81,43 @@ def log_execution(
8181
return audit_id
8282

8383
def _get_last_hash(self) -> str:
84-
"""Read the hash of the last log entry for chain continuity."""
84+
"""Read the hash of the last log entry for chain continuity.
85+
86+
Reads up to 8KB from the end of the file to find the last complete
87+
JSON line, avoiding byte-by-byte seeking on large files.
88+
"""
8589
if not self.log_path.exists() or self.log_path.stat().st_size == 0:
8690
return "GENESIS"
8791

8892
try:
8993
with open(self.log_path, 'rb') as f:
9094
f.seek(0, 2)
91-
pos = f.tell()
92-
if pos == 0:
93-
return "GENESIS"
94-
# Read backwards to find last complete line
95-
buf = b''
96-
while pos > 0:
97-
pos -= 1
98-
f.seek(pos)
99-
char = f.read(1)
100-
if char == b'\n' and buf:
101-
break
102-
buf = char + buf
103-
if buf:
104-
last_entry = json.loads(buf)
95+
size = f.tell()
96+
# Read last 8KB (more than enough for one audit entry)
97+
read_size = min(size, 8192)
98+
f.seek(size - read_size)
99+
chunk = f.read(read_size)
100+
101+
# Find the last complete line
102+
lines = chunk.split(b'\n')
103+
# Walk backwards to find last non-empty line
104+
for line in reversed(lines):
105+
line = line.strip()
106+
if line:
107+
last_entry = json.loads(line)
105108
return last_entry.get("entry_hash", "GENESIS")
106109
except (json.JSONDecodeError, OSError, KeyError):
107110
pass
108111
return "GENESIS"
109112

110113
def _write_entry(self, entry: Dict[str, Any]) -> None:
111-
"""Write entry with hash chain linking to previous entry."""
114+
"""Write entry with hash chain linking to previous entry.
115+
116+
Verification algorithm: to verify entry N, strip 'entry_hash' from
117+
the dict, serialize with sort_keys=True, and SHA-256 the result.
118+
Compare against the stored entry_hash. Then verify entry N's
119+
prev_hash matches entry N-1's entry_hash.
120+
"""
112121
prev_hash = self._get_last_hash()
113122
entry["prev_hash"] = prev_hash
114123

0 commit comments

Comments
 (0)