diff --git a/packages/agent/src/dkg-agent.ts b/packages/agent/src/dkg-agent.ts index a091bdc4c..57901f6ff 100644 --- a/packages/agent/src/dkg-agent.ts +++ b/packages/agent/src/dkg-agent.ts @@ -64,7 +64,7 @@ import { EVMChainAdapter, NoChainAdapter, enrichEvmError, type EVMAdapterConfig, import { DKGPublisher, PublishHandler, SharedMemoryHandler, UpdateHandler, ChainEventPoller, AccessHandler, AccessClient, PublishJournal, StaleWriteError, - ACKCollector, StorageACKHandler, + ACKCollector, StorageACKHandler, resolvePeersHostingContextGraph, VerifyCollector, VerifyProposalHandler, buildVerificationMetadata, resolveWorkspaceAgentRecipients, computeTripleHashV10 as computeTripleHash, computeFlatKCRootV10 as computeFlatKCRoot, autoPartition, isReservedSubject, computePrivateRootV10 as computePrivateRoot, @@ -14006,6 +14006,23 @@ export class DKGAgent { // negotiation (fast, no error logs on the remote side). return connected; }, + getCorePeersHostingContextGraph: async (cgIdStr: string) => { + // Restrict the candidate set to cores whose latest agent profile + // advertises serving the target CG via `skill:contextGraphsServed`. + // This filters out cores that would otherwise have to throw + // "No data found in SWM graph …" inside their StorageACK handler + // (visible to the publisher as a stream reset). The collector + // warns and falls back to the full set if too few cores match, + // so a stale/incomplete hosting registry never blocks publishes. + const advertised = await resolvePeersHostingContextGraph(this.store, cgIdStr); + if (advertised.length === 0) return []; + const advertisedSet = new Set(advertised); + const peers = this.node.libp2p.getPeers(); + return peers + .map(p => p.toString()) + .filter(id => id !== this.peerId) + .filter(id => advertisedSet.has(id)); + }, verifyIdentity: typeof this.chain.verifyACKIdentity === 'function' ? async (recoveredAddress: string, claimedIdentityId: bigint) => { try { diff --git a/packages/cli/src/daemon/lifecycle.ts b/packages/cli/src/daemon/lifecycle.ts index 954b7e840..1c04a1096 100644 --- a/packages/cli/src/daemon/lifecycle.ts +++ b/packages/cli/src/daemon/lifecycle.ts @@ -54,7 +54,11 @@ const execFileAsync = promisify(execFile); import { enrichEvmError, MockChainAdapter } from '@origintrail-official/dkg-chain'; import { DKGAgent, loadOpWallets } from '@origintrail-official/dkg-agent'; import { computeNetworkId, createOperationContext, DKGEvent, Logger, PayloadTooLargeError, GET_VIEWS, TrustLevel, validateSubGraphName, validateAssertionName, validateContextGraphId, isSafeIri, assertSafeIri, sparqlIri, contextGraphSharedMemoryUri, contextGraphAssertionUri, contextGraphMetaUri } from '@origintrail-official/dkg-core'; -import { findReservedSubjectPrefix, isSkolemizedUri } from '@origintrail-official/dkg-publisher'; +import { + findReservedSubjectPrefix, + isSkolemizedUri, + resolvePeersHostingContextGraph, +} from '@origintrail-official/dkg-publisher'; import { DashboardDB, MetricsCollector, @@ -794,6 +798,25 @@ export async function runDaemonInner( } return allPeers; }, + getCorePeersHostingContextGraph: async (cgIdStr: string) => { + // See `packages/publisher/src/ack-collector.ts` — restricts the + // ACK candidate set to cores whose latest agent profile + // advertises `skill:contextGraphsServed`-ing the target CG, so + // we don't waste a quorum slot on a core that would have to + // throw "No data found in SWM graph …" (visible to us as a + // libp2p stream reset). Fail-soft: collector falls back to the + // unfiltered set if too few cores match. + const advertised = await resolvePeersHostingContextGraph( + agent.store, + cgIdStr, + ); + if (advertised.length === 0) return []; + const advertisedSet = new Set(advertised); + return agent.node.libp2p + .getPeers() + .map((p) => p.toString()) + .filter((id) => id !== agent.peerId && advertisedSet.has(id)); + }, log, }), log, diff --git a/packages/cli/src/publisher-runner.ts b/packages/cli/src/publisher-runner.ts index f836ed86c..ecfaab1ae 100644 --- a/packages/cli/src/publisher-runner.ts +++ b/packages/cli/src/publisher-runner.ts @@ -24,6 +24,15 @@ interface ACKTransportFactory { gossipPublish: (topic: string, data: Uint8Array) => Promise; sendP2P: (peerId: string, protocol: string, data: Uint8Array) => Promise; getConnectedCorePeers: () => string[]; + /** + * Optional hosting filter, threaded straight through to + * `ACKCollectorDeps.getCorePeersHostingContextGraph`. Implementations + * should consult their local agent registry (e.g. via + * `resolvePeersHostingContextGraph` from `@origintrail-official/dkg-publisher`) + * intersected with currently-connected cores. Optional so existing + * embedders that don't supply a hosting registry still work. + */ + getCorePeersHostingContextGraph?: (cgIdStr: string) => string[] | Promise; log?: (message: string) => void; } @@ -328,6 +337,7 @@ function createV10ACKProviderForPublisher( gossipPublish: transport.gossipPublish, sendP2P: transport.sendP2P, getConnectedCorePeers: transport.getConnectedCorePeers, + getCorePeersHostingContextGraph: transport.getCorePeersHostingContextGraph, verifyIdentity: async (recoveredAddress: string, claimedIdentityId: bigint) => chain.verifyACKIdentity!(recoveredAddress, claimedIdentityId), log: transport.log, }); diff --git a/packages/publisher/src/ack-collector.ts b/packages/publisher/src/ack-collector.ts index 23600a274..121f00e04 100644 --- a/packages/publisher/src/ack-collector.ts +++ b/packages/publisher/src/ack-collector.ts @@ -12,6 +12,27 @@ export interface ACKCollectorDeps { gossipPublish: (topic: string, data: Uint8Array) => Promise; sendP2P: (peerId: string, protocol: string, data: Uint8Array) => Promise; getConnectedCorePeers: () => string[]; + /** + * Optional hosting filter. Given the source SWM context-graph UAL, return + * the subset of currently-connected core peers whose published agent + * profile (``) + * advertises hosting it. + * + * When provided, the collector prefers this filtered candidate set — + * cores that don't host the CG would otherwise have to reject the + * StorageACK request mid-stream (most often as `No data found in SWM + * graph ...`), which the publisher sees as a libp2p stream reset and + * has to retry/timeout against. Filtering up front avoids that cost. + * + * Behaviour when fewer than `requiredACKs` peers match: the collector + * logs a warning naming the CG + which connected cores are vs. aren't + * advertising it, and **falls back to the full connected-core set** so + * publishes still proceed when the hosting registry is incomplete or + * stale. This deliberately keeps the path live during discovery races, + * but makes hosting-coverage bugs (see GitHub issue #541) visible in + * the log instead of presenting as opaque ACK timeouts. + */ + getCorePeersHostingContextGraph?: (cgIdStr: string) => string[] | Promise; verifyIdentity?: (recoveredAddress: string, claimedIdentityId: bigint) => Promise; log?: (msg: string) => void; } @@ -32,6 +53,17 @@ export interface ACKCollectionResult { const DEFAULT_REQUIRED_ACKS = 3; const ACK_TIMEOUT_MS = 120_000; const MAX_RETRIES = 3; +/** + * Hard ceiling for the optional `getCorePeersHostingContextGraph` + * lookup. The lookup runs against the local triple store BEFORE the + * `ACK_TIMEOUT_MS` budget begins, so an unbounded await here can block + * a publish indefinitely if the store is under load or the query + * implementation hangs (Codex Review on PR#556). On timeout the + * collector treats the lookup as "no hosting signal" — falling back to + * the legacy single-wave behaviour against all connected cores — + * rather than escalating into a publish failure. + */ +const HOSTING_FILTER_TIMEOUT_MS = 1_500; /** * ACKCollector implements V10 spec §9.0 Phase 3: collecting 3 core node @@ -122,16 +154,98 @@ export class ACKCollector { // that decode payloads as FinalizationMessages, causing decode errors. log(`[ACKCollector] Collecting ACKs via direct P2P (merkleRoot=${ethers.hexlify(merkleRoot).slice(0, 18)}...)`); - const corePeers = this.deps.getConnectedCorePeers(); - if (corePeers.length === 0) { + const allConnected = this.deps.getConnectedCorePeers(); + if (allConnected.length === 0) { throw new Error('ACK collection failed: no connected core peers'); } - if (corePeers.length < REQUIRED_ACKS) { + + // Split connected cores into two waves: + // priorityPeers — advertise hosting the source SWM CG; tried first. + // fallbackPeers — connected but don't advertise; only tried if + // the priority wave can't satisfy quorum. + // + // Cores outside the priority set are NOT a hard gate (a stale or + // missing advertisement on one peer in `priorityPeers` shouldn't + // be able to fail a publish that the rest of the connected pool + // could have satisfied — Codex Review on PR#556 flagged this). + // Wave 2 only fires when wave 1 doesn't reach quorum, so the happy + // path still avoids dialling cores that would just decline / reset + // the stream (the GitHub #541 cost). + let priorityPeers: string[] = allConnected; + let fallbackPeers: string[] = []; + if (this.deps.getCorePeersHostingContextGraph) { + const hostingGraphIdStr = params.swmGraphId && params.swmGraphId.length > 0 + ? params.swmGraphId + : contextGraphIdStr; + let hostingPeers: string[] = []; + try { + const lookupPromise = Promise.resolve( + this.deps.getCorePeersHostingContextGraph(hostingGraphIdStr), + ); + // Bound the local-store lookup so a slow / hung registry query + // can't block the publish before the ACK_TIMEOUT_MS budget even + // begins (Codex Review on PR#556). On timeout we treat the + // result as "no hosting signal" — i.e. fall back to the legacy + // single-wave path against all connected cores. + let timeoutHandle: ReturnType | undefined; + const timeoutSentinel: unique symbol = Symbol('hosting-filter-timeout'); + const timeoutPromise = new Promise(resolve => { + timeoutHandle = setTimeout(() => resolve(timeoutSentinel), HOSTING_FILTER_TIMEOUT_MS); + }); + const settled = await Promise.race([lookupPromise, timeoutPromise]); + if (timeoutHandle) clearTimeout(timeoutHandle); + if (settled === timeoutSentinel) { + log( + `[ACKCollector] hosting-filter lookup did not return within ${HOSTING_FILTER_TIMEOUT_MS}ms for "${hostingGraphIdStr}"; ` + + `dialling all ${allConnected.length} connected cores in a single wave`, + ); + hostingPeers = []; + } else { + hostingPeers = settled; + } + } catch (err) { + const errMsg = err instanceof Error ? err.message : String(err); + log( + `[ACKCollector] hosting-filter lookup failed for "${hostingGraphIdStr}" (${errMsg}); ` + + `dialling all ${allConnected.length} connected cores in a single wave`, + ); + hostingPeers = []; + } + const hostingSet = new Set(hostingPeers); + const matched = allConnected.filter(p => hostingSet.has(p)); + const excluded = allConnected.filter(p => !hostingSet.has(p)); + + if (matched.length === 0) { + // No hosting signal at all — keep the legacy single-wave shape. + log( + `[ACKCollector] hosting filter: no connected cores advertise hosting "${hostingGraphIdStr}"; ` + + `dialling all ${allConnected.length} connected cores in a single wave (expect declines from non-hosting cores).`, + ); + } else { + priorityPeers = matched; + fallbackPeers = excluded; + const includedTag = matched.map(p => p.slice(-8)).join(', '); + const excludedTag = excluded.length > 0 ? excluded.map(p => p.slice(-8)).join(', ') : ''; + log( + `[ACKCollector] hosting filter: priority wave = ${matched.length}/${allConnected.length} cores advertising "${hostingGraphIdStr}" [${includedTag}]; ` + + `fallback wave = ${excluded.length} non-advertising cores [${excludedTag}].`, + ); + if (matched.length < REQUIRED_ACKS) { + log( + `[ACKCollector] WARN: only ${matched.length} connected cores advertise hosting "${hostingGraphIdStr}" (need ${REQUIRED_ACKS}); ` + + `fallback wave will be dialled if priority wave can't satisfy quorum. ` + + `If "${hostingGraphIdStr}" has replicationPolicy=full, the non-advertising cores have a coverage bug (see GitHub issue #541).`, + ); + } + } + } + + if (allConnected.length < REQUIRED_ACKS) { throw new Error( - `ACK collection failed: need ${REQUIRED_ACKS} ACKs but only ${corePeers.length} core peers connected — quorum impossible`, + `ACK collection failed: need ${REQUIRED_ACKS} ACKs but only ${allConnected.length} core peers connected — quorum impossible`, ); } - log(`[ACKCollector] Requesting ACKs from ${corePeers.length} core peers (need ${REQUIRED_ACKS})`); + log(`[ACKCollector] Requesting ACKs from ${allConnected.length} core peers (need ${REQUIRED_ACKS}; priority=${priorityPeers.length}, fallback=${fallbackPeers.length})`); const ackDigest = computePublishACKDigest( chainId, @@ -202,22 +316,41 @@ export class ACKCollector { let quorumResolve: (() => void) | undefined; const quorumPromise = new Promise(resolve => { quorumResolve = resolve; }); + /** + * Dial a wave of peers in parallel, accumulating their ACKs into + * the shared `collected` slot. Stops early when the publisher has + * `REQUIRED_ACKS` distinct (peer, identity) ACKs. + */ + const dialWave = async (peers: string[]): Promise => { + if (peers.length === 0) return; + const promises = peers.map(async (peerId) => { + if (collected.length >= REQUIRED_ACKS) return; + const ack = await requestACK(peerId); + if (ack && !seenPeers.has(ack.peerId) && !seenIdentityIds.has(ack.nodeIdentityId)) { + seenPeers.add(ack.peerId); + seenIdentityIds.add(ack.nodeIdentityId); + collected.push(ack); + if (collected.length >= REQUIRED_ACKS) { + quorumResolve?.(); + } + } + }); + await Promise.race([Promise.allSettled(promises), quorumPromise]); + }; + + let triedPeerCount = 0; await Promise.race([ (async () => { - const promises = corePeers.map(async (peerId) => { - if (collected.length >= REQUIRED_ACKS) return; - const ack = await requestACK(peerId); - if (ack && !seenPeers.has(ack.peerId) && !seenIdentityIds.has(ack.nodeIdentityId)) { - seenPeers.add(ack.peerId); - seenIdentityIds.add(ack.nodeIdentityId); - collected.push(ack); - if (collected.length >= REQUIRED_ACKS) { - quorumResolve?.(); - return; - } - } - }); - await Promise.race([Promise.allSettled(promises), quorumPromise]); + await dialWave(priorityPeers); + triedPeerCount = priorityPeers.length; + if (collected.length < REQUIRED_ACKS && fallbackPeers.length > 0) { + log( + `[ACKCollector] Priority wave settled with ${collected.length}/${REQUIRED_ACKS} ACKs; ` + + `dialling fallback wave (${fallbackPeers.length} non-advertising core(s))`, + ); + await dialWave(fallbackPeers); + triedPeerCount += fallbackPeers.length; + } })(), new Promise((_, reject) => setTimeout(() => reject(new Error(`storage_ack_timeout: only ${collected.length}/${REQUIRED_ACKS} ACKs received within ${ACK_TIMEOUT_MS}ms`)), @@ -229,7 +362,7 @@ export class ACKCollector { if (collected.length < REQUIRED_ACKS) { throw new Error( `storage_ack_insufficient: got ${collected.length}/${REQUIRED_ACKS} valid ACKs. ` + - `Tried ${corePeers.length} core peers.`, + `Tried ${triedPeerCount} core peers.`, ); } diff --git a/packages/publisher/src/hosting-resolver.ts b/packages/publisher/src/hosting-resolver.ts new file mode 100644 index 000000000..75fee1715 --- /dev/null +++ b/packages/publisher/src/hosting-resolver.ts @@ -0,0 +1,105 @@ +import { + contextGraphDataUri, + SYSTEM_CONTEXT_GRAPHS, +} from '@origintrail-official/dkg-core'; +import type { TripleStore } from '@origintrail-official/dkg-storage'; + +/** + * Authoritative named graph that holds agent profile triples. Mirrors + * `AGENT_REGISTRY_GRAPH` in `packages/agent/src/profile.ts`. Profiles + * landing in any other named graph (e.g. a stale snapshot held in a + * developer's test store) MUST NOT be trusted by ACK routing. + */ +export const AGENT_REGISTRY_NAMED_GRAPH: string = contextGraphDataUri(SYSTEM_CONTEXT_GRAPHS.AGENTS); + +/** + * Resolves the set of peer IDs whose published agent profile advertises + * hosting a given context graph. Backs the optional + * `getCorePeersHostingContextGraph` dependency on the ACK collector. + * + * The agent profile shape (built by `packages/agent/src/profile.ts`) is: + * + * ```turtle + * GRAPH { + * > rdf:type dkg:CoreNode . + * > dkg:peerId "" . + * > skill:hostingProfile <…/.well-known/genid/hosting> . + * <…/.well-known/genid/hosting> skill:contextGraphsServed ",,..." + * } + * ``` + * + * `contextGraphsServed` is a single comma-separated literal rather than + * many object terms, so the membership check is a delimiter-aware + * substring test against `,,` to avoid false positives where one + * UAL is a string prefix of another (e.g. `…/repnet` vs. + * `…/repnet-edge-smoke`). + */ +export async function resolvePeersHostingContextGraph( + store: TripleStore, + contextGraphIdStr: string, +): Promise { + if (!contextGraphIdStr) return []; + + const escapeForSparqlString = (raw: string): string => + raw + .replace(/\\/g, '\\\\') + .replace(/"/g, '\\"') + .replace(/\n/g, '\\n') + .replace(/\r/g, '\\r') + .replace(/\t/g, '\\t'); + + const escapedUal = escapeForSparqlString(contextGraphIdStr); + // The query is pinned to the authoritative agent-registry named + // graph (`did:dkg:context-graph:agents`). Codex Review on PR#556 + // flagged that an open `GRAPH ?g` allows ACK routing to trust + // profile triples landing in any named graph (e.g. a copied or + // stale snapshot held elsewhere in the store), which can skew the + // priority wave toward the wrong peers. + // + // The `dkg:CoreNode` constraint is mandatory: edge nodes can also + // emit `skill:contextGraphsServed` (e.g. for join-time discovery), + // and they don't register the StorageACK protocol handler. Without + // this filter the collector would target peers that just stream- + // reset on `/dkg/10.0.0/storage-ack`. `getConnectedCorePeers()` + // already does protocol-discovery-time filtering, but during early + // startup it falls back to "all connected peers", so the role + // constraint must be enforced here too. + const escapedRegistryGraph = escapeForSparqlString(AGENT_REGISTRY_NAMED_GRAPH); + const sparql = ` +PREFIX rdf: +PREFIX dkg: +PREFIX skill: +SELECT DISTINCT ?peerId WHERE { + GRAPH <${escapedRegistryGraph}> { + ?agent rdf:type dkg:CoreNode ; + dkg:peerId ?peerId ; + skill:hostingProfile ?hosting . + ?hosting skill:contextGraphsServed ?served . + BIND(CONCAT(",", STR(?served), ",") AS ?normalizedServed) + BIND(CONCAT(",", "${escapedUal}", ",") AS ?needle) + FILTER(CONTAINS(?normalizedServed, ?needle)) + } +} +`; + + const result = await store.query(sparql); + if (result.type !== 'bindings') return []; + + // Bindings carry the literal in its lexical form including wrapping + // quotes (and an optional ^^ tail). Strip both, mirroring the + // `result.bindings[…]?.['x']?.replace(/^"|"$/g, '')` pattern used + // throughout `packages/agent/src/dkg-agent.ts`. + const stripLiteral = (raw: string): string => + raw + .replace(/^"/, '') + .replace(/"(?:\^\^.*)?$/, ''); + + const peers = new Set(); + for (const row of result.bindings) { + const raw = row['peerId']; + if (typeof raw !== 'string' || raw.length === 0) continue; + const peerId = stripLiteral(raw); + if (peerId.length > 0) peers.add(peerId); + } + return [...peers]; +} diff --git a/packages/publisher/src/index.ts b/packages/publisher/src/index.ts index 2adfbae9c..b66907295 100644 --- a/packages/publisher/src/index.ts +++ b/packages/publisher/src/index.ts @@ -48,6 +48,7 @@ export { type CollectedACK, type ACKCollectionResult, } from './ack-collector.js'; +export { resolvePeersHostingContextGraph } from './hosting-resolver.js'; export { StorageACKHandler, type StorageACKHandlerConfig } from './storage-ack-handler.js'; export { VerifyCollector, diff --git a/packages/publisher/test/v10-ack-edge-cases.test.ts b/packages/publisher/test/v10-ack-edge-cases.test.ts index 9e4ee9139..772e81e03 100644 --- a/packages/publisher/test/v10-ack-edge-cases.test.ts +++ b/packages/publisher/test/v10-ack-edge-cases.test.ts @@ -895,3 +895,450 @@ describe('StorageACKHandler signature format', () => { expect(identityId).toBe(coreIdentityId); }); }); + +// ── ACKCollector hosting filter (GitHub issue #541) ────────────────────── +// +// When a publisher targets a context graph that not every connected core +// hosts, the ACK collector should prefer cores that publicly advertise +// hosting it via ``. Cores that don't host the CG +// would otherwise have to throw inside their StorageACK handler (most +// often `No data found in SWM graph …`) and the publisher sees that as a +// libp2p stream reset, not as a graceful "I can't ACK this" signal. +// +// The filter doesn't behave as a hard gate — Codex Review on PR#556 +// flagged that a stale advertisement on one matched peer could block a +// publish the rest of the connected pool could have satisfied. Wave- +// based dial: priority wave (matched cores) is dialled first; the +// fallback wave (the rest) is only dialled if the priority wave can't +// satisfy quorum on its own. + +describe('ACKCollector hosting filter (#541)', () => { + it('uses only the priority wave when it satisfies quorum (no fallback dialled)', async () => { + const sendP2P = tracked(buildSendP2P()); + const log = noop(); + const hostingFilter = tracked((_cgId: string) => ['peer-1', 'peer-2', 'peer-4']); + const deps: ACKCollectorDeps = { + gossipPublish: noop(), + sendP2P: sendP2P as any, + getConnectedCorePeers: () => ['peer-0', 'peer-1', 'peer-2', 'peer-3', 'peer-4'], + getCorePeersHostingContextGraph: hostingFilter, + log, + }; + const collector = new ACKCollector(deps); + + const result = await collector.collect(buildCollectParams()); + expect(result.acks).toHaveLength(3); + expect(hostingFilter.calls).toHaveLength(1); + expect(hostingFilter.calls[0][0]).toBe(testCGIdStr); + + const targeted = sendP2P.calls.map((c) => c[0] as string).sort(); + expect(targeted).toEqual(['peer-1', 'peer-2', 'peer-4']); + + expect(log.calls.some( + (c: unknown[]) => (c[0] as string).includes('priority wave = 3/5'), + )).toBe(true); + expect(log.calls.some( + (c: unknown[]) => (c[0] as string).includes('fallback wave'), + )).toBe(true); + expect(log.calls.some( + (c: unknown[]) => (c[0] as string).includes('Priority wave settled'), + )).toBe(false); + }); + + it('uses swmGraphId, not target contextGraphIdStr, for hosting lookup on remap publishes', async () => { + const sendP2P = tracked(buildSendP2P()); + const log = noop(); + const sourceSwmGraphId = '0x8fb6dcd4/source-swm'; + const hostingFilter = tracked((cgId: string) => + cgId === sourceSwmGraphId ? ['peer-1', 'peer-2', 'peer-4'] : [], + ); + const deps: ACKCollectorDeps = { + gossipPublish: noop(), + sendP2P: sendP2P as any, + getConnectedCorePeers: () => ['peer-0', 'peer-1', 'peer-2', 'peer-3', 'peer-4'], + getCorePeersHostingContextGraph: hostingFilter, + log, + }; + const collector = new ACKCollector(deps); + + const result = await collector.collect(buildCollectParams({ + swmGraphId: sourceSwmGraphId, + })); + + expect(result.acks).toHaveLength(3); + expect(hostingFilter.calls).toHaveLength(1); + expect(hostingFilter.calls[0][0]).toBe(sourceSwmGraphId); + + const targeted = sendP2P.calls.map((c) => c[0] as string).sort(); + expect(targeted).toEqual(['peer-1', 'peer-2', 'peer-4']); + expect(log.calls.some( + (c: unknown[]) => (c[0] as string).includes(`advertising "${sourceSwmGraphId}"`), + )).toBe(true); + }); + + it('falls back to non-advertising cores when priority wave cannot satisfy quorum (stale advertisements)', async () => { + // Codex Review on PR#556 flagged this: matched advertisement was a + // hard gate, so a single stale entry could fail the publish. Now the + // collector dials the fallback wave whenever priority falls short. + const advertisedButStale = new Set(['peer-1']); + const sendP2P = tracked(async (peerId: string) => { + // Stale-advertised peer signs an ACK with a bad merkle root so the + // collector rejects it; legacy code would have failed because only + // matched peers were dialled. + if (advertisedButStale.has(peerId)) { + const wrong = new Uint8Array(32).fill(0xff); + const idx = parseInt(peerId.replace('peer-', ''), 10); + const wallet = coreWallets[idx]; + const { r, vs } = await signACK(wallet, testCGId, wrong); + return encodeStorageACK({ + merkleRoot: wrong, + coreNodeSignatureR: r, + coreNodeSignatureVS: vs, + contextGraphId: testCGIdStr, + nodeIdentityId: idx + 1, + }); + } + return buildSendP2P()(peerId); + }); + const log = noop(); + const deps: ACKCollectorDeps = { + gossipPublish: noop(), + sendP2P: sendP2P as any, + getConnectedCorePeers: () => ['peer-0', 'peer-1', 'peer-2', 'peer-3', 'peer-4'], + // peer-1 is advertised but stale; peer-3, peer-4 advertise correctly. + getCorePeersHostingContextGraph: () => ['peer-1', 'peer-3', 'peer-4'], + log, + }; + const collector = new ACKCollector(deps); + + const result = await collector.collect(buildCollectParams()); + expect(result.acks).toHaveLength(3); + + const targeted = new Set(sendP2P.calls.map((c) => c[0] as string)); + expect(targeted.has('peer-1')).toBe(true); + expect(targeted.has('peer-3')).toBe(true); + expect(targeted.has('peer-4')).toBe(true); + // Fallback wave kicked in because priority got 2/3 (peer-1 sent a + // bad-root ACK that was rejected). At least one of peer-0, peer-2 + // must have been dialled to backfill. + expect(targeted.has('peer-0') || targeted.has('peer-2')).toBe(true); + + expect(log.calls.some( + (c: unknown[]) => (c[0] as string).includes('Priority wave settled with 2/3'), + )).toBe(true); + expect(log.calls.some( + (c: unknown[]) => (c[0] as string).includes('dialling fallback wave'), + )).toBe(true); + }); + + it('uses fallback as a single wave with WARN when priority is smaller than requiredACKs', async () => { + const sendP2P = tracked(buildSendP2P()); + const log = noop(); + const deps: ACKCollectorDeps = { + gossipPublish: noop(), + sendP2P: sendP2P as any, + getConnectedCorePeers: () => ['peer-0', 'peer-1', 'peer-2', 'peer-3'], + getCorePeersHostingContextGraph: () => ['peer-2'], + log, + }; + const collector = new ACKCollector(deps); + + const result = await collector.collect(buildCollectParams()); + expect(result.acks).toHaveLength(3); + + const targeted = new Set(sendP2P.calls.map((c) => c[0] as string)); + expect(targeted.size).toBe(4); + + const warnLog = log.calls.find( + (c: unknown[]) => (c[0] as string).includes('WARN: only 1 connected cores advertise hosting'), + ); + expect(warnLog).toBeDefined(); + expect(warnLog && (warnLog[0] as string)).toContain(testCGIdStr); + expect(warnLog && (warnLog[0] as string)).toContain('#541'); + }); + + it('treats empty filter result as "no hosting signal" and dials all connected cores in a single wave', async () => { + const sendP2P = tracked(buildSendP2P()); + const log = noop(); + const deps: ACKCollectorDeps = { + gossipPublish: noop(), + sendP2P: sendP2P as any, + getConnectedCorePeers: () => ['peer-0', 'peer-1', 'peer-2'], + getCorePeersHostingContextGraph: () => [], + log, + }; + const collector = new ACKCollector(deps); + + const result = await collector.collect(buildCollectParams()); + expect(result.acks).toHaveLength(3); + + expect(log.calls.some( + (c: unknown[]) => (c[0] as string).includes('no connected cores advertise hosting'), + )).toBe(true); + expect(log.calls.some( + (c: unknown[]) => (c[0] as string).includes('Priority wave settled'), + )).toBe(false); + }); + + it('treats filter throw as "no hosting signal" and dials all connected cores in a single wave', async () => { + const sendP2P = tracked(buildSendP2P()); + const log = noop(); + const deps: ACKCollectorDeps = { + gossipPublish: noop(), + sendP2P: sendP2P as any, + getConnectedCorePeers: () => ['peer-0', 'peer-1', 'peer-2'], + getCorePeersHostingContextGraph: () => { + throw new Error('triple-store unavailable'); + }, + log, + }; + const collector = new ACKCollector(deps); + + const result = await collector.collect(buildCollectParams()); + expect(result.acks).toHaveLength(3); + + expect(log.calls.some( + (c: unknown[]) => (c[0] as string).includes('hosting-filter lookup failed'), + )).toBe(true); + expect(log.calls.some( + (c: unknown[]) => (c[0] as string).includes('triple-store unavailable'), + )).toBe(true); + }); + + it('handles async filter implementations that resolve to peer ids', async () => { + const sendP2P = tracked(buildSendP2P()); + const deps: ACKCollectorDeps = { + gossipPublish: noop(), + sendP2P: sendP2P as any, + getConnectedCorePeers: () => ['peer-0', 'peer-1', 'peer-2', 'peer-3'], + getCorePeersHostingContextGraph: async (cgId: string) => { + await new Promise((r) => setTimeout(r, 1)); + return cgId === testCGIdStr ? ['peer-1', 'peer-2', 'peer-3'] : []; + }, + log: noop(), + }; + const collector = new ACKCollector(deps); + + const result = await collector.collect(buildCollectParams()); + expect(result.acks).toHaveLength(3); + + const targeted = sendP2P.calls.map((c) => c[0] as string).sort(); + expect(targeted).toEqual(['peer-1', 'peer-2', 'peer-3']); + }); + + it('does NOT block the publish when the hosting-filter lookup hangs (Codex Review on PR#556)', async () => { + // Bound the local-store hosting query so a slow registry can't + // burn the entire publish budget before the collector even starts + // dialling peers. On lookup timeout the collector falls back to + // the single-wave path against all connected cores. + const sendP2P = tracked(buildSendP2P()); + const log = noop(); + const deps: ACKCollectorDeps = { + gossipPublish: noop(), + sendP2P: sendP2P as any, + getConnectedCorePeers: () => ['peer-0', 'peer-1', 'peer-2'], + getCorePeersHostingContextGraph: () => new Promise(() => {}), + log, + }; + const collector = new ACKCollector(deps); + + const start = Date.now(); + const result = await collector.collect(buildCollectParams()); + const elapsed = Date.now() - start; + + expect(result.acks).toHaveLength(3); + // Lookup ceiling is 1.5s; with collector and signature work it + // still has to land well under the 120s ACK timeout. + expect(elapsed).toBeLessThan(8_000); + + expect(log.calls.some( + (c: unknown[]) => (c[0] as string).includes('hosting-filter lookup did not return within'), + )).toBe(true); + // All connected cores get dialled (single fallback wave) since + // the timeout means we have no hosting signal. + const targeted = new Set(sendP2P.calls.map((c) => c[0] as string)); + expect(targeted.size).toBe(3); + }); + + it('without the dep, behaviour is identical to today (no filter)', async () => { + const sendP2P = tracked(buildSendP2P()); + const deps: ACKCollectorDeps = { + gossipPublish: noop(), + sendP2P: sendP2P as any, + getConnectedCorePeers: () => ['peer-0', 'peer-1', 'peer-2'], + log: noop(), + }; + const collector = new ACKCollector(deps); + + const result = await collector.collect(buildCollectParams()); + expect(result.acks).toHaveLength(3); + + const targeted = sendP2P.calls.map((c) => c[0] as string).sort(); + expect(targeted).toEqual(['peer-0', 'peer-1', 'peer-2']); + }); +}); + +// ── resolvePeersHostingContextGraph (triple-store helper) ──────────────── + +describe('resolvePeersHostingContextGraph', () => { + const SKILL = 'https://dkg.origintrail.io/skill#'; + const DKG_NS = 'https://dkg.network/ontology#'; + const RDF_TYPE = 'http://www.w3.org/1999/02/22-rdf-syntax-ns#type'; + const REGISTRY_GRAPH = 'did:dkg:context-graph:agents'; + + function buildAgentProfileQuads(args: { + walletOrPeerId: string; + peerId: string; + contextGraphsServed: string; + role?: 'core' | 'edge'; + }): Quad[] { + const entity = `did:dkg:agent:${args.walletOrPeerId}`; + const hosting = `${entity}/.well-known/genid/hosting`; + const role = args.role ?? 'core'; + const roleType = role === 'core' ? `${DKG_NS}CoreNode` : `${DKG_NS}EdgeNode`; + return [ + { subject: entity, predicate: RDF_TYPE, object: `${DKG_NS}Agent`, graph: REGISTRY_GRAPH }, + { subject: entity, predicate: RDF_TYPE, object: roleType, graph: REGISTRY_GRAPH }, + { subject: entity, predicate: `${DKG_NS}nodeRole`, object: `"${role}"`, graph: REGISTRY_GRAPH }, + { subject: entity, predicate: `${DKG_NS}peerId`, object: `"${args.peerId}"`, graph: REGISTRY_GRAPH }, + { subject: entity, predicate: `${SKILL}hostingProfile`, object: hosting, graph: REGISTRY_GRAPH }, + { subject: hosting, predicate: `${SKILL}contextGraphsServed`, object: `"${args.contextGraphsServed}"`, graph: REGISTRY_GRAPH }, + ]; + } + + it('returns the peer id for a core agent advertising the exact UAL', async () => { + const { resolvePeersHostingContextGraph } = await import('../src/hosting-resolver.js'); + const store = new OxigraphStore(); + await store.insert(buildAgentProfileQuads({ + walletOrPeerId: '0xb8625ad8', + peerId: '12D3KooWnWcpzsge', + contextGraphsServed: '0x8fb6dcd4/repnet,0x8fb6dcd4/repnet-edge-smoke,0x8fb6dcd4/testing', + })); + + expect(await resolvePeersHostingContextGraph(store, '0x8fb6dcd4/repnet')) + .toEqual(['12D3KooWnWcpzsge']); + expect(await resolvePeersHostingContextGraph(store, '0x8fb6dcd4/testing')) + .toEqual(['12D3KooWnWcpzsge']); + }); + + it('only trusts profile triples in the authoritative agent-registry named graph (Codex Review on PR#556)', async () => { + // A copied / stale profile snapshot landing in some other named + // graph (e.g. a developer's playground graph, or a partial sync + // staging area) MUST NOT influence ACK routing — otherwise the + // collector's priority wave can be skewed toward whichever peers + // happen to appear in the strongest snapshot. + const { resolvePeersHostingContextGraph } = await import('../src/hosting-resolver.js'); + const store = new OxigraphStore(); + + const STRAY_GRAPH = 'did:dkg:context-graph:some-other-graph'; + const buildQuadsInGraph = (peerId: string, graph: string): Quad[] => { + const entity = `did:dkg:agent:${peerId}`; + const hosting = `${entity}/.well-known/genid/hosting`; + return [ + { subject: entity, predicate: RDF_TYPE, object: `${DKG_NS}Agent`, graph }, + { subject: entity, predicate: RDF_TYPE, object: `${DKG_NS}CoreNode`, graph }, + { subject: entity, predicate: `${DKG_NS}nodeRole`, object: `"core"`, graph }, + { subject: entity, predicate: `${DKG_NS}peerId`, object: `"${peerId}"`, graph }, + { subject: entity, predicate: `${SKILL}hostingProfile`, object: hosting, graph }, + { subject: hosting, predicate: `${SKILL}contextGraphsServed`, object: `"0x8fb6dcd4/repnet"`, graph }, + ]; + }; + + await store.insert([ + ...buildQuadsInGraph('12D3KooWauthoritative', REGISTRY_GRAPH), + ...buildQuadsInGraph('12D3KooWstray', STRAY_GRAPH), + ]); + + const peers = await resolvePeersHostingContextGraph(store, '0x8fb6dcd4/repnet'); + expect(peers).toEqual(['12D3KooWauthoritative']); + expect(peers).not.toContain('12D3KooWstray'); + }); + + it('filters out edge nodes even if they advertise contextGraphsServed (Codex Review on PR#556)', async () => { + // Edge nodes don't register the StorageACK protocol handler, so an + // edge node that publishes a contextGraphsServed list (e.g. for + // join-time discovery) must NOT be considered a candidate. Without + // this constraint, `getConnectedCorePeers()`'s early-startup + // fallback to "all connected peers" would let the collector dial a + // peer that just stream-resets the request. + const { resolvePeersHostingContextGraph } = await import('../src/hosting-resolver.js'); + const store = new OxigraphStore(); + await store.insert([ + ...buildAgentProfileQuads({ + walletOrPeerId: '0xCoreA', + peerId: '12D3KooWcoreA', + contextGraphsServed: '0x8fb6dcd4/repnet', + role: 'core', + }), + ...buildAgentProfileQuads({ + walletOrPeerId: '0xEdgeB', + peerId: '12D3KooWedgeB', + contextGraphsServed: '0x8fb6dcd4/repnet', + role: 'edge', + }), + ]); + + const peers = await resolvePeersHostingContextGraph(store, '0x8fb6dcd4/repnet'); + expect(peers).toEqual(['12D3KooWcoreA']); + expect(peers).not.toContain('12D3KooWedgeB'); + }); + + it('does NOT match when the requested UAL is only a string prefix of an entry', async () => { + // Guards against `0x.../repnet` falsely matching `0x.../repnet-edge-smoke`. + const { resolvePeersHostingContextGraph } = await import('../src/hosting-resolver.js'); + const store = new OxigraphStore(); + await store.insert(buildAgentProfileQuads({ + walletOrPeerId: '0xb8625ad8', + peerId: '12D3KooWnWcpzsge', + contextGraphsServed: '0x8fb6dcd4/repnet-edge-smoke,0x8fb6dcd4/testing', + })); + + expect(await resolvePeersHostingContextGraph(store, '0x8fb6dcd4/repnet')) + .toEqual([]); + }); + + it('returns peer ids for the cores that host repnet-v2-official, excludes the one that doesn\'t (the #541 hosting layout)', async () => { + const { resolvePeersHostingContextGraph } = await import('../src/hosting-resolver.js'); + const store = new OxigraphStore(); + + const TARGET = '0x8fb6dcd4B3e07E610958750DbD72Ae4acdce3738/repnet-v2-official'; + const SIBLINGS = '0x8fb6dcd4B3e07E610958750DbD72Ae4acdce3738/repnet,0x8fb6dcd4B3e07E610958750DbD72Ae4acdce3738/repnet-edge-smoke,0x8fb6dcd4B3e07E610958750DbD72Ae4acdce3738/testing'; + + await store.insert([ + ...buildAgentProfileQuads({ + walletOrPeerId: '0x75bdf866', + peerId: '12D3KooWaijsNrWw', + contextGraphsServed: `${SIBLINGS},${TARGET}`, + }), + ...buildAgentProfileQuads({ + walletOrPeerId: '0xf822793d', + peerId: '12D3KooWCe8Q82bZ', + contextGraphsServed: `${SIBLINGS},${TARGET}`, + }), + ...buildAgentProfileQuads({ + walletOrPeerId: '0xb8625ad8', + peerId: '12D3KooWnWcpzsge', + contextGraphsServed: SIBLINGS, + }), + ]); + + const peers = await resolvePeersHostingContextGraph(store, TARGET); + expect(new Set(peers)).toEqual(new Set(['12D3KooWaijsNrWw', '12D3KooWCe8Q82bZ'])); + expect(peers).not.toContain('12D3KooWnWcpzsge'); + }); + + it('returns [] for an empty UAL without throwing', async () => { + const { resolvePeersHostingContextGraph } = await import('../src/hosting-resolver.js'); + const store = new OxigraphStore(); + expect(await resolvePeersHostingContextGraph(store, '')).toEqual([]); + }); + + it('safely handles UALs containing characters that need SPARQL escaping', async () => { + const { resolvePeersHostingContextGraph } = await import('../src/hosting-resolver.js'); + const store = new OxigraphStore(); + // Pathological UAL with a quote — should not produce an unescaped + // SPARQL string and either match cleanly or return [] without throw. + const result = await resolvePeersHostingContextGraph(store, '0xabc/we"ird'); + expect(Array.isArray(result)).toBe(true); + }); +});