Skip to content

Commit 730e58a

Browse files
Branimir Rakiccursoragent
authored andcommitted
fix(agent,cli,scripts): OT-RFC-38 LU-6 — address Codex PR #610 R1
Five correctness bugs raised on PR #610 by Codex round 1, all addressed. 1. memory.ts /catchup fallback now decides per-CG. Previously the `totalInserted === 0` gate aggregated across every requested CG, so a multi-CG catchup where one CG got triples from standard sync would silently skip LU-6 host-catchup for the others. Restructured to iterate CGs serially, fan out to peers in parallel within each CG, and decide host-catchup per CG. The new response shape exposes per-CG breakdown via `perContextGraph` + `hostCatchup.triggeredFor ContextGraphIds`, with a backward-compatible flat `results` array aggregated per peer. 2. memory.ts /catchup `totalInsertedTriples` now includes the host-catchup applied count. Previously a successful LU-6 recovery looked like a no-op at the top level (the field only counted standard sync inserts). Added a separate `standardInsertedTriples` field for callers that want the pre-fallback figure. 3. host-mode-store.ts seqno recovery from the log tail on cold load. The log is the source of truth (`appendFile` is durable before `persistMeta` runs); a crash in between would let the next append reuse a seqno that was already written, which breaks host-catchup paging (uses strict-greater-than seqno). `loadMeta()` now scans the log tail on first access per CG and takes `max(meta.seqno, lastLogSeqno)`. Reconciled value is persisted so subsequent cold loads skip the scan. New unit test simulates the stale-meta crash and verifies the next append picks the right seqno. 4. dkg-agent.ts reconcileSharedMemoryGossipSubscription — clear `swmHostModeSubscribed` when topic-wide `unsubscribe()` wipes the host listener. `GossipSubManager.unsubscribe(topic)` removes ALL handlers on the topic, not just the member-mode one; on member authorisation revocation that would drop the LU-6 host listener too, and `reconcileSwmHostModeSubscription()` would early-return on the still-set `swmHostModeSubscribed` flag — stranding hosting until restart. Clear the flag before the host-mode reconciler runs so it re-wires. 5. dkg-agent.ts enableSwmHostModeFor now probes `isContextGraphRegisteredOnChain` + `markRegistered` on both first-subscribe and idempotent re-entry. The explicit /host-mode/ subscribe path is the Phase A surface that designates hosting from a topic id alone (no CG metadata required); without this probe the store would stay on the 6h/1MiB pre-registration limits forever, pruning ciphertext from registered CGs much earlier than intended. Extracted the probe-and-mark step as `maybeMarkRegisteredForHostMode()` and reused it from both call sites. Test harness hardening - `wait_for_peer_link` helper polls `/api/connections` until a target peer shows up, with a 90s budget. Use it before SCENARIO C and D sender-key sends to avoid pre-existing `send timeout: backoff aborted by overall deadline (20000ms)` flakes after curator restart. - The flake was not LU-6-related (it pre-dates this PR); the probe just makes the suite reliable without changing the protocol-level timeout. Validation - All 4 late-joiner devnet scenarios PASS on a fresh `./scripts/devnet.sh start 6` run. SCENARIO D output now shows `totalInsertedTriples: 1` (was `0` before R1), `standardInserted Triples: 0`, `hostCatchup.triggeredForContextGraphIds: ["<CG_D>"]`, `perContextGraph: [{...perPeer:[...]}]` — all the new visibility surfaces working. - `packages/agent/test/swm/host-mode-store.test.ts` — 9 tests pass (8 original + the new seqno-recovery regression). - `packages/publisher` test suite — 965 passed / 1 skipped. Co-authored-by: Cursor <cursoragent@cursor.com>
1 parent efb1e1a commit 730e58a

5 files changed

Lines changed: 322 additions & 82 deletions

File tree

packages/agent/src/dkg-agent.ts

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -8389,8 +8389,24 @@ export class DKGAgent {
83898389
const ctx = createOperationContext('system');
83908390
if (!(await this.canUseSharedMemoryForContextGraph(contextGraphId))) {
83918391
if (isRegistered) {
8392+
// `gossip.unsubscribe()` drops EVERY handler on the topic,
8393+
// not just the member-mode one. If this core was already
8394+
// hosting the curated SWM in HOST MODE (LU-6), losing
8395+
// member authorisation here would also kill the host
8396+
// listener, and `swmHostModeSubscribed` would still be set
8397+
// — making `reconcileSwmHostModeSubscription()` early-
8398+
// return on the next pass and stranding the hosting state
8399+
// until restart (Codex PR #610 R1 comment 4).
8400+
//
8401+
// We work around the topic-wide unsubscribe by clearing
8402+
// the host-mode subscribed flag here so the immediate
8403+
// `reconcileSwmHostModeSubscription()` call below will
8404+
// re-wire the host listener if host mode is still
8405+
// applicable.
8406+
const wasHostModeSubscribed = this.swmHostModeSubscribed.has(contextGraphId);
83928407
this.gossip.unsubscribe(swmTopic);
83938408
this.sharedMemoryGossipRegistered.delete(contextGraphId);
8409+
if (wasHostModeSubscribed) this.swmHostModeSubscribed.delete(contextGraphId);
83948410
this.log.warn(ctx, `SWM gossip unsubscribed for "${contextGraphId}": local node is no longer authorized`);
83958411
} else {
83968412
this.log.warn(ctx, `SWM gossip subscription denied for "${contextGraphId}": local node is not authorized`);
@@ -8503,14 +8519,7 @@ export class DKGAgent {
85038519
});
85048520
});
85058521

8506-
// If the CG is already on-chain registered, transition the
8507-
// store immediately to the larger registered-CG limits.
8508-
try {
8509-
const onChainKnown = await this.isContextGraphRegisteredOnChain(contextGraphId);
8510-
if (onChainKnown) {
8511-
await this.swmHostModeStore.markRegistered(contextGraphId);
8512-
}
8513-
} catch { /* best-effort */ }
8522+
await this.maybeMarkRegisteredForHostMode(contextGraphId);
85148523

85158524
this.log.info(
85168525
createOperationContext('system'),
@@ -8900,6 +8909,12 @@ export class DKGAgent {
89008909
return { subscribed: false, alreadySubscribed: false, hostingEnabled: true };
89018910
}
89028911
if (this.swmHostModeSubscribed.has(contextGraphId)) {
8912+
// Idempotent re-entry: even when the subscription is already
8913+
// active, re-probe registration state. This handles the
8914+
// legitimate "CG was unregistered when first subscribed,
8915+
// operator later registered it on-chain, operator re-calls
8916+
// /host-mode/subscribe" flow without forcing a daemon restart.
8917+
await this.maybeMarkRegisteredForHostMode(contextGraphId);
89038918
return { subscribed: false, alreadySubscribed: true, hostingEnabled: true };
89048919
}
89058920
const swmTopic = contextGraphWorkspaceTopic(contextGraphId);
@@ -8914,13 +8929,36 @@ export class DKGAgent {
89148929
);
89158930
});
89168931
});
8932+
// Codex PR #610 R1 comment 5: a core that only knows the CG by
8933+
// topic id (the explicit /host-mode/subscribe entrypoint) must
8934+
// still transition the store to the registered-CG limits as
8935+
// soon as the on-chain record exists. Without this probe the
8936+
// store would stay on the 6h/1MiB pre-registration defaults
8937+
// forever and prune ciphertext from registered CGs much
8938+
// earlier than intended.
8939+
await this.maybeMarkRegisteredForHostMode(contextGraphId);
89178940
this.log.info(
89188941
createOperationContext('system'),
89198942
`SWM host-mode subscription explicitly enabled for "${contextGraphId}" via API (role=${this.config.nodeRole ?? 'edge'})`,
89208943
);
89218944
return { subscribed: true, alreadySubscribed: false, hostingEnabled: true };
89228945
}
89238946

8947+
/**
8948+
* Probe on-chain registration and flip the host-mode store's
8949+
* per-CG cursor to the registered-CG limits when the CG is
8950+
* already known to the contracts. Safe to call repeatedly and on
8951+
* unregistered CGs — both branches early-return without touching
8952+
* the store.
8953+
*/
8954+
private async maybeMarkRegisteredForHostMode(contextGraphId: string): Promise<void> {
8955+
if (!this.swmHostModeStore) return;
8956+
try {
8957+
const onChainKnown = await this.isContextGraphRegisteredOnChain(contextGraphId);
8958+
if (onChainKnown) await this.swmHostModeStore.markRegistered(contextGraphId);
8959+
} catch { /* best-effort; pre-registration defaults stay in place */ }
8960+
}
8961+
89248962
/**
89258963
* Receiver handler for `PROTOCOL_SWM_SHARE_ACK`. Extracted into
89268964
* a named method (mirrors `handleSwmUpdate`'s shape) so the

packages/agent/src/swm/host-mode-store.ts

Lines changed: 60 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -352,21 +352,75 @@ export class SwmHostModeStore {
352352
return meta.registered ? this.registeredLimits : this.unregisteredLimits;
353353
}
354354

355+
/**
356+
* Load (and cache) the per-CG metadata. On a cold load the seqno
357+
* cursor is reconciled against the actual log file: a crash
358+
* between `appendFile` (durable) and `persistMeta` (durable) would
359+
* otherwise let the next append reuse the same seqno, which would
360+
* break host-catchup paging that uses strict-greater-than seqno.
361+
*
362+
* The log is the source of truth for what was actually persisted;
363+
* the meta file is a cache of the highest-known seqno plus the
364+
* `registered` flag. After process start we always trust the log
365+
* tail's max seqno over the meta file's cursor if the two disagree
366+
* — taking `max(metaSeqno, lastLogSeqno)` guarantees we never
367+
* recycle a seqno even if the meta write lost a race to the crash.
368+
*/
355369
private async loadMeta(contextGraphId: string): Promise<CgMetaState> {
356370
const cgKey = this.cgKey(contextGraphId);
357371
const cached = this.metaCache.get(cgKey);
358372
if (cached) return cached;
359373
const metaPath = this.metaPath(contextGraphId);
374+
let parsed: CgMetaState | undefined;
360375
try {
361376
const txt = await fs.readFile(metaPath, 'utf-8');
362-
const parsed = JSON.parse(txt) as CgMetaState;
363-
this.metaCache.set(cgKey, parsed);
364-
return parsed;
377+
parsed = JSON.parse(txt) as CgMetaState;
365378
} catch {
366-
const fresh: CgMetaState = { seqno: 0, registered: false, contextGraphId };
367-
this.metaCache.set(cgKey, fresh);
368-
return fresh;
379+
parsed = undefined;
380+
}
381+
const logSeqno = await this.recoverLastSeqnoFromLog(contextGraphId);
382+
const state: CgMetaState = {
383+
seqno: Math.max(parsed?.seqno ?? 0, logSeqno),
384+
registered: parsed?.registered ?? false,
385+
contextGraphId,
386+
};
387+
// If the log says more than the meta does, persist the
388+
// reconciled cursor so subsequent cold loads don't have to
389+
// re-scan the log tail.
390+
if (parsed && state.seqno !== parsed.seqno) {
391+
await fs.writeFile(metaPath, JSON.stringify(state)).catch(() => { /* best-effort */ });
392+
}
393+
this.metaCache.set(cgKey, state);
394+
return state;
395+
}
396+
397+
/**
398+
* Scan the per-CG log tail and return the highest seqno actually
399+
* persisted on disk. Reads the whole file (the per-CG cap keeps
400+
* this bounded — default 1 MiB unregistered, 64 MiB registered)
401+
* and walks frame-by-frame. Returns 0 if no log file exists or
402+
* the file is empty/corrupt at the head.
403+
*/
404+
private async recoverLastSeqnoFromLog(contextGraphId: string): Promise<number> {
405+
const filePath = this.logPath(contextGraphId);
406+
if (!(await fileExists(filePath))) return 0;
407+
let buf: Buffer;
408+
try {
409+
buf = await fs.readFile(filePath);
410+
} catch {
411+
return 0;
412+
}
413+
let lastSeqno = 0;
414+
let offset = 0;
415+
while (offset + ENTRY_HEADER_BYTES <= buf.length) {
416+
const seqno = Number(buf.readBigUInt64BE(offset + 8));
417+
const len = buf.readUInt32BE(offset + 16);
418+
const end = offset + ENTRY_HEADER_BYTES + len;
419+
if (end > buf.length) break;
420+
if (seqno > lastSeqno) lastSeqno = seqno;
421+
offset = end;
369422
}
423+
return lastSeqno;
370424
}
371425

372426
private async persistMeta(contextGraphId: string, meta: CgMetaState): Promise<void> {

packages/agent/test/swm/host-mode-store.test.ts

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,40 @@ describe('SwmHostModeStore', () => {
125125
await expect(store.append('cg/empty', new Uint8Array(0))).rejects.toThrow(/zero-length/);
126126
});
127127

128+
it('recovers seqno from log tail when meta cursor is stale (crash between appendFile and persistMeta)', async () => {
129+
const cgId = 'curator/test-7';
130+
const limits = { perCgByteCap: 1024 * 1024, ttlMs: 60_000 };
131+
const first = new SwmHostModeStore({ dataDir: dir, unregisteredLimits: limits, registeredLimits: limits });
132+
await first.append(cgId, new Uint8Array([1]));
133+
await first.append(cgId, new Uint8Array([2]));
134+
await first.append(cgId, new Uint8Array([3]));
135+
136+
// Simulate a crash where persistMeta lost a race and the meta
137+
// file reports a lower seqno than the log actually contains.
138+
// We do this by overwriting the meta file in place with the
139+
// stale cursor (seqno=1, but the log has 3 entries).
140+
const { promises: fs } = await import('node:fs');
141+
const { createHash } = await import('node:crypto');
142+
const cgKey = createHash('sha256').update(cgId).digest('base64url');
143+
const metaPath = path.join(dir, `${cgKey}.meta`);
144+
await fs.writeFile(metaPath, JSON.stringify({ seqno: 1, registered: false, contextGraphId: cgId }));
145+
146+
// Fresh store instance reads the stale meta + scans the log tail
147+
// and reconciles to max(1, 3) = 3. The next append MUST assign
148+
// seqno 4, not 2 (which would otherwise collide with the
149+
// existing entry).
150+
const second = new SwmHostModeStore({ dataDir: dir, unregisteredLimits: limits, registeredLimits: limits });
151+
const recovered = await second.getLastSeqno(cgId);
152+
expect(recovered).toBe(3);
153+
const next = await second.append(cgId, new Uint8Array([4]));
154+
expect(next).toBe(4);
155+
156+
// All four entries are visible via iterate, with unique
157+
// strictly-increasing seqnos.
158+
const all = await second.iterate(cgId, 0);
159+
expect(all.map((e) => e.seqno)).toEqual([1, 2, 3, 4]);
160+
});
161+
128162
it('isolates CGs by id', async () => {
129163
const store = new SwmHostModeStore({
130164
dataDir: dir,

0 commit comments

Comments
 (0)