Skip to content

Commit 33553da

Browse files
author
Hanson Mei
committed
refactor(codex): use direct API calls and add file locking
- Replace subprocess nmem CLI calls with direct nmem_cli API imports - Add fcntl-based file locking to prevent race conditions - Implement atomic state file writes with temp file + os.replace - Add thread existence check and use create vs append endpoints - Filter malformed messages before sync - Add rollback mechanism in refresh_thread_titles on failure - Update tests to mock nmem_cli modules instead of subprocess - Add AGENTS.md with project agent collaboration guidelines
1 parent ed150a9 commit 33553da

4 files changed

Lines changed: 394 additions & 130 deletions

File tree

AGENTS.md

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
# Nowledge Community — Agent Guidelines
2+
3+
## Registry
4+
5+
[`integrations.json`](integrations.json) is the **single source of truth** for all Nowledge Mem integrations. It tracks capabilities, versions, install commands, transport, tool naming, and thread save methods.
6+
7+
**When adding or modifying any integration, update `integrations.json` first.** Other surfaces (website `integrations.ts`, desktop app integrations view, README tables, marketplace JSONs) derive from or validate against this file.
8+
9+
The desktop app fetches this file at runtime from `https://raw.githubusercontent.com/nowledge-co/community/main/integrations.json` for plugin update awareness. Changes to the schema (adding/removing/renaming fields) affect:
10+
- **Rust** (`lib.rs`): `fetch_plugin_registry`, `detect_installed_plugins`, `write_plugin_update_state`
11+
- **TypeScript** (`plugin-update-manager.ts`): `RegistryIntegration` interface
12+
- **Python** (`health.py`): `_read_plugin_update_state` reader
13+
14+
## Behavioral Guidance
15+
16+
[`shared/behavioral-guidance.md`](shared/behavioral-guidance.md) defines when plugins should search, save, read Working Memory, and distill. All plugins should align with this shared guidance.
17+
18+
## Plugin Development
19+
20+
See [`docs/PLUGIN_DEVELOPMENT_GUIDE.md`](docs/PLUGIN_DEVELOPMENT_GUIDE.md) for authoring rules, directory layout, and testing expectations.
21+
22+
## Submodules
23+
24+
`nowledge-mem-gemini-cli` is a nested submodule (separate repo with its own release cycle). All other integrations are normal directories in this repo.
25+
26+
## Commit Workflow
27+
28+
When modifying this repo as a submodule of the parent `muscat` repo:
29+
1. Commit inside `community/` first
30+
2. Then stage the updated submodule reference in the parent repo

nowledge-mem-codex-plugin/hooks/nmem-stop-save.py

Lines changed: 122 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,23 +1,26 @@
11
#!/usr/bin/env python3
22
from __future__ import annotations
33

4+
import fcntl
45
import hashlib
56
import json
67
import os
78
import re
8-
import shutil
9-
import subprocess
109
import sys
1110
import tempfile
11+
import time
12+
from contextlib import contextmanager
1213
from datetime import datetime, timezone
1314
from pathlib import Path
15+
from urllib.parse import quote
1416

1517

1618
HOME = Path.home()
1719
CODEX_DIR = HOME / ".codex"
1820
LOG_FILE = CODEX_DIR / "log" / "nowledge-mem-stop-hook.log"
1921
STATE_FILE = CODEX_DIR / "nowledge_mem_codex_hook_state.json"
20-
IMPORT_TIMEOUT_SECONDS = 30
22+
STATE_LOCK_FILE = CODEX_DIR / "nowledge_mem_codex_hook_state.lock"
23+
LOCK_TIMEOUT_SECONDS = 2.0
2124

2225

2326
def ensure_parent(path: Path) -> None:
@@ -35,14 +38,51 @@ def load_json(path: Path) -> dict:
3538
if not path.exists():
3639
return {}
3740
try:
38-
return json.loads(path.read_text(encoding="utf-8"))
41+
payload = json.loads(path.read_text(encoding="utf-8"))
42+
return payload if isinstance(payload, dict) else {}
3943
except Exception:
4044
return {}
4145

4246

4347
def save_json(path: Path, payload: dict) -> None:
4448
ensure_parent(path)
45-
path.write_text(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True), encoding="utf-8")
49+
temp_path: Path | None = None
50+
try:
51+
with tempfile.NamedTemporaryFile(
52+
"w",
53+
delete=False,
54+
dir=path.parent,
55+
encoding="utf-8",
56+
) as handle:
57+
json.dump(payload, handle, ensure_ascii=False, indent=2, sort_keys=True)
58+
temp_path = Path(handle.name)
59+
os.replace(temp_path, path)
60+
finally:
61+
if temp_path is not None and temp_path.exists():
62+
temp_path.unlink(missing_ok=True)
63+
64+
65+
@contextmanager
66+
def state_lock(
67+
lock_path: Path = STATE_LOCK_FILE,
68+
timeout_seconds: float = LOCK_TIMEOUT_SECONDS,
69+
):
70+
ensure_parent(lock_path)
71+
with lock_path.open("a+", encoding="utf-8") as handle:
72+
deadline = time.monotonic() + timeout_seconds
73+
while True:
74+
try:
75+
fcntl.flock(handle.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
76+
break
77+
except BlockingIOError:
78+
if time.monotonic() >= deadline:
79+
raise TimeoutError(f"timed out waiting for lock {lock_path}")
80+
time.sleep(0.05)
81+
82+
try:
83+
yield
84+
finally:
85+
fcntl.flock(handle.fileno(), fcntl.LOCK_UN)
4686

4787

4888
def configure_nmem_env() -> None:
@@ -56,18 +96,6 @@ def configure_nmem_env() -> None:
5696
os.environ["NMEM_API_KEY"] = str(api_key)
5797

5898

59-
def resolve_nmem_command() -> list[str] | None:
60-
nmem_bin = shutil.which("nmem") or shutil.which("nmem.cmd")
61-
if nmem_bin:
62-
return [nmem_bin]
63-
64-
uvx_bin = shutil.which("uvx") or shutil.which("uvx.cmd")
65-
if uvx_bin:
66-
return [uvx_bin, "--from", "nmem-cli", "nmem"]
67-
68-
return None
69-
70-
7199
def shorten_title(text: str, limit: int = 60) -> str:
72100
normalized = text.replace("\n", " ").strip()
73101
if len(normalized) <= limit:
@@ -168,23 +196,22 @@ def import_current_transcript(payload: dict) -> tuple[int, str]:
168196
return 0, f"skip: transcript_path missing or unreadable for session={session_id}"
169197
log(f"transcript={transcript_path}")
170198

171-
nmem_command = resolve_nmem_command()
172-
if not nmem_command:
173-
return 0, "skip: neither nmem nor uvx was found in PATH"
174-
199+
configure_nmem_env()
175200
try:
201+
from nmem_cli.cli import api_get_optional, api_post
176202
from nmem_cli.session_import import parse_codex_session_streaming
177203
except Exception as exc:
178-
return 0, f"skip: failed to import nmem_cli parser: {exc}"
204+
return 0, f"skip: failed to import nmem_cli modules: {exc}"
179205

180206
try:
181207
parsed = parse_codex_session_streaming(transcript_path, truncate_large_content=True)
182208
except Exception as exc:
183209
return 0, f"skip: failed to parse codex rollout: {exc}"
184210

185211
messages = [
186-
{"role": message.get("role"), "content": message.get("content", "")}
212+
{"role": message["role"], "content": message.get("content", "")}
187213
for message in parsed.get("messages", [])
214+
if isinstance(message, dict) and message.get("role")
188215
]
189216
if not messages:
190217
return 0, f"skip: parsed zero messages for session={session_id}"
@@ -195,66 +222,80 @@ def import_current_transcript(payload: dict) -> tuple[int, str]:
195222
json.dumps(messages, ensure_ascii=False, sort_keys=True).encode("utf-8")
196223
).hexdigest()
197224

198-
state = load_json(STATE_FILE)
199-
previous = state.get(session_id, {})
200-
if (
201-
previous.get("content_hash") == content_hash
202-
and previous.get("message_count") == message_count
203-
):
204-
return 0, f"skip: unchanged transcript for session={session_id}"
205-
206-
import_messages = messages
207-
previous_count = previous.get("message_count")
208-
if (
209-
isinstance(previous_count, int)
210-
and previous_count > 0
211-
and previous_count < message_count
212-
and previous.get("thread_id") == thread_id
213-
and previous.get("source_file") == str(transcript_path)
214-
):
215-
import_messages = messages[previous_count:]
216-
217-
if not import_messages:
218-
return 0, f"skip: no new messages for session={session_id}"
219-
220-
import_payload = {
221-
"title": derive_thread_title(parsed, cwd),
222-
"messages": import_messages,
223-
}
224-
225-
with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False, encoding="utf-8") as handle:
226-
json.dump(import_payload, handle, ensure_ascii=False)
227-
temp_path = Path(handle.name)
225+
title = derive_thread_title(parsed, cwd)
226+
workspace = parsed.get("workspace") or cwd or None
227+
project = Path(cwd).name if cwd else None
228+
metadata = parsed.get("metadata") or {}
228229

229230
try:
230-
command = [*nmem_command, "--json", "t", "import", "-f", str(temp_path), "--id", thread_id, "-s", "codex"]
231-
result = subprocess.run(
232-
command,
233-
capture_output=True,
234-
text=True,
235-
env=os.environ.copy(),
236-
timeout=IMPORT_TIMEOUT_SECONDS,
237-
)
238-
except subprocess.TimeoutExpired:
239-
return 0, f"skip: nmem import timed out after {IMPORT_TIMEOUT_SECONDS}s for thread={thread_id}"
240-
finally:
241-
try:
242-
temp_path.unlink(missing_ok=True)
243-
except Exception:
244-
pass
245-
246-
output = (result.stdout or "") + (result.stderr or "")
247-
if result.returncode == 0:
248-
state[session_id] = {
249-
"content_hash": content_hash,
250-
"message_count": message_count,
251-
"thread_id": thread_id,
252-
"source_file": str(transcript_path),
253-
"updated_at": datetime.now(timezone.utc).isoformat(),
254-
}
255-
save_json(STATE_FILE, state)
256-
257-
return result.returncode, output.strip()
231+
with state_lock():
232+
state = load_json(STATE_FILE)
233+
previous = state.get(session_id, {})
234+
if (
235+
previous.get("content_hash") == content_hash
236+
and previous.get("message_count") == message_count
237+
):
238+
return 0, f"skip: unchanged transcript for session={session_id}"
239+
240+
import_messages = messages
241+
previous_count = previous.get("message_count")
242+
if (
243+
isinstance(previous_count, int)
244+
and previous_count > 0
245+
and previous_count < message_count
246+
and previous.get("thread_id") == thread_id
247+
and previous.get("source_file") == str(transcript_path)
248+
):
249+
import_messages = messages[previous_count:]
250+
251+
if not import_messages:
252+
return 0, f"skip: no new messages for session={session_id}"
253+
254+
encoded_thread_id = quote(thread_id, safe='')
255+
existing_thread = api_get_optional(f"/threads/{encoded_thread_id}")
256+
257+
if existing_thread is None:
258+
api_post(
259+
"/threads",
260+
{
261+
"thread_id": thread_id,
262+
"title": title,
263+
"messages": messages,
264+
"source": "codex",
265+
"project": project,
266+
"workspace": workspace,
267+
"metadata": metadata,
268+
},
269+
)
270+
action = "created"
271+
synced_messages = len(messages)
272+
else:
273+
api_post(
274+
f"/threads/{encoded_thread_id}/append",
275+
{
276+
"messages": import_messages,
277+
"deduplicate": True,
278+
},
279+
)
280+
action = "appended"
281+
synced_messages = len(import_messages)
282+
283+
with state_lock():
284+
state = load_json(STATE_FILE)
285+
state[session_id] = {
286+
"content_hash": content_hash,
287+
"message_count": message_count,
288+
"thread_id": thread_id,
289+
"source_file": str(transcript_path),
290+
"updated_at": datetime.now(timezone.utc).isoformat(),
291+
}
292+
save_json(STATE_FILE, state)
293+
except TimeoutError as exc:
294+
return 0, f"skip: failed to lock state file for session={session_id}: {exc}"
295+
except Exception as exc:
296+
return 0, f"skip: failed to sync thread={thread_id}: {exc}"
297+
298+
return 0, f"{action}: thread={thread_id} synced_messages={synced_messages} total_messages={message_count}"
258299

259300

260301
def main() -> int:

nowledge-mem-codex-plugin/scripts/refresh_thread_titles.py

Lines changed: 58 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,22 +37,59 @@ def get_thread(thread_id: str) -> dict | None:
3737
)
3838

3939

40-
def refresh_thread(thread_id: str, title: str, messages: list[dict], dry_run: bool) -> None:
40+
def build_thread_payload(
41+
thread_id: str,
42+
title: str,
43+
messages: list[dict],
44+
thread: dict | None = None,
45+
) -> dict:
46+
thread = thread or {}
47+
return {
48+
"thread_id": thread_id,
49+
"title": title,
50+
"source": thread.get("source") or "codex",
51+
"project": thread.get("project"),
52+
"workspace": thread.get("workspace"),
53+
"metadata": thread.get("metadata") or {},
54+
"messages": [{"role": m["role"], "content": m["content"]} for m in messages],
55+
}
56+
57+
58+
def refresh_thread(
59+
thread_id: str,
60+
title: str,
61+
messages: list[dict],
62+
dry_run: bool,
63+
original_title: str | None = None,
64+
original_messages: list[dict] | None = None,
65+
thread: dict | None = None,
66+
) -> None:
4167
if dry_run:
4268
print(f"DRY RUN refresh {thread_id} -> {title}", flush=True)
4369
return
4470

45-
cli.api_delete(f"/threads/{quote(thread_id, safe='')}")
46-
cli.api_post(
47-
"/threads/import",
48-
{
49-
"thread_id": thread_id,
50-
"title": title,
51-
"source": "codex",
52-
"messages": [{"role": m["role"], "content": m["content"]} for m in messages],
53-
},
54-
timeout=IMPORT_TIMEOUT_SECONDS,
55-
)
71+
encoded_thread_id = quote(thread_id, safe='')
72+
replacement_payload = build_thread_payload(thread_id, title, messages, thread=thread)
73+
cli.api_delete(f"/threads/{encoded_thread_id}")
74+
try:
75+
cli.api_post(
76+
"/threads",
77+
replacement_payload,
78+
timeout=IMPORT_TIMEOUT_SECONDS,
79+
)
80+
except Exception:
81+
if original_title is not None and original_messages is not None:
82+
cli.api_post(
83+
"/threads",
84+
build_thread_payload(
85+
thread_id,
86+
original_title,
87+
original_messages,
88+
thread=thread,
89+
),
90+
timeout=IMPORT_TIMEOUT_SECONDS,
91+
)
92+
raise
5693
print(f"REFRESHED {thread_id} -> {title}", flush=True)
5794

5895

@@ -106,7 +143,15 @@ def main() -> int:
106143
continue
107144

108145
try:
109-
refresh_thread(thread_id, desired_title, parsed.get("messages", []), args.dry_run)
146+
refresh_thread(
147+
thread_id,
148+
desired_title,
149+
parsed.get("messages", []),
150+
args.dry_run,
151+
original_title=current_title,
152+
original_messages=thread_payload.get("messages", []),
153+
thread=thread,
154+
)
110155
refreshed += 1
111156
except SystemExit as exc:
112157
errors += 1

0 commit comments

Comments
 (0)