Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion packages/agent/src/dkg-agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ import { EVMChainAdapter, NoChainAdapter, enrichEvmError, type EVMAdapterConfig,
import {
DKGPublisher, PublishHandler, SharedMemoryHandler, UpdateHandler, ChainEventPoller, AccessHandler, AccessClient,
PublishJournal, StaleWriteError,
ACKCollector, StorageACKHandler,
ACKCollector, StorageACKHandler, resolvePeersHostingContextGraph,
VerifyCollector, VerifyProposalHandler, buildVerificationMetadata,
resolveWorkspaceAgentRecipients,
computeTripleHashV10 as computeTripleHash, computeFlatKCRootV10 as computeFlatKCRoot, autoPartition, isReservedSubject, computePrivateRootV10 as computePrivateRoot,
Expand Down Expand Up @@ -14006,6 +14006,27 @@ export class DKGAgent {
// negotiation (fast, no error logs on the remote side).
return connected;
},
getCorePeersHostingContextGraph: async (cgIdStr: string) => {
// Restrict the candidate set to cores whose latest agent profile
// advertises serving the target CG via `skill:contextGraphsServed`.
// This filters out cores that would otherwise have to throw
// "No data found in SWM graph …" inside their StorageACK handler
// (visible to the publisher as a stream reset). The collector
// warns and falls back to the full set if too few cores match,
// so a stale/incomplete hosting registry never blocks publishes.
try {
const advertised = await resolvePeersHostingContextGraph(this.store, cgIdStr);
if (advertised.length === 0) return [];
const advertisedSet = new Set(advertised);
const peers = this.node.libp2p.getPeers();
return peers
.map(p => p.toString())
.filter(id => id !== this.peerId)
.filter(id => advertisedSet.has(id));
} catch {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Issue: Catching and returning [] here hides real resolver/store failures from ACKCollector, so production regressions become indistinguishable from a legitimate 'no hosting signal' fallback. The collector already handles failures safely and logs the cause; either let the error propagate to it, or log the exception before returning [] here (same issue in packages/cli/src/daemon/lifecycle.ts).

return [];
}
},
verifyIdentity: typeof this.chain.verifyACKIdentity === 'function'
? async (recoveredAddress: string, claimedIdentityId: bigint) => {
try {
Expand Down
29 changes: 28 additions & 1 deletion packages/cli/src/daemon/lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,11 @@ const execFileAsync = promisify(execFile);
import { enrichEvmError, MockChainAdapter } from '@origintrail-official/dkg-chain';
import { DKGAgent, loadOpWallets } from '@origintrail-official/dkg-agent';
import { computeNetworkId, createOperationContext, DKGEvent, Logger, PayloadTooLargeError, GET_VIEWS, TrustLevel, validateSubGraphName, validateAssertionName, validateContextGraphId, isSafeIri, assertSafeIri, sparqlIri, contextGraphSharedMemoryUri, contextGraphAssertionUri, contextGraphMetaUri } from '@origintrail-official/dkg-core';
import { findReservedSubjectPrefix, isSkolemizedUri } from '@origintrail-official/dkg-publisher';
import {
findReservedSubjectPrefix,
isSkolemizedUri,
resolvePeersHostingContextGraph,
} from '@origintrail-official/dkg-publisher';
import {
DashboardDB,
MetricsCollector,
Expand Down Expand Up @@ -794,6 +798,29 @@ export async function runDaemonInner(
}
return allPeers;
},
getCorePeersHostingContextGraph: async (cgIdStr: string) => {
// See `packages/publisher/src/ack-collector.ts` — restricts the
// ACK candidate set to cores whose latest agent profile
// advertises `skill:contextGraphsServed`-ing the target CG, so
// we don't waste a quorum slot on a core that would have to
// throw "No data found in SWM graph …" (visible to us as a
// libp2p stream reset). Fail-soft: collector falls back to the
// unfiltered set if too few cores match.
try {
const advertised = await resolvePeersHostingContextGraph(
agent.store,
cgIdStr,
);
if (advertised.length === 0) return [];
const advertisedSet = new Set(advertised);
return agent.node.libp2p
.getPeers()
.map((p) => p.toString())
.filter((id) => id !== agent.peerId && advertisedSet.has(id));
} catch {
return [];
}
},
log,
}),
log,
Expand Down
10 changes: 10 additions & 0 deletions packages/cli/src/publisher-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,15 @@ interface ACKTransportFactory {
gossipPublish: (topic: string, data: Uint8Array) => Promise<void>;
sendP2P: (peerId: string, protocol: string, data: Uint8Array) => Promise<Uint8Array>;
getConnectedCorePeers: () => string[];
/**
* Optional hosting filter, threaded straight through to
* `ACKCollectorDeps.getCorePeersHostingContextGraph`. Implementations
* should consult their local agent registry (e.g. via
* `resolvePeersHostingContextGraph` from `@origintrail-official/dkg-publisher`)
* intersected with currently-connected cores. Optional so existing
* embedders that don't supply a hosting registry still work.
*/
getCorePeersHostingContextGraph?: (cgIdStr: string) => string[] | Promise<string[]>;
log?: (message: string) => void;
}

Expand Down Expand Up @@ -328,6 +337,7 @@ function createV10ACKProviderForPublisher(
gossipPublish: transport.gossipPublish,
sendP2P: transport.sendP2P,
getConnectedCorePeers: transport.getConnectedCorePeers,
getCorePeersHostingContextGraph: transport.getCorePeersHostingContextGraph,
verifyIdentity: async (recoveredAddress: string, claimedIdentityId: bigint) => chain.verifyACKIdentity!(recoveredAddress, claimedIdentityId),
log: transport.log,
});
Expand Down
170 changes: 150 additions & 20 deletions packages/publisher/src/ack-collector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,27 @@ export interface ACKCollectorDeps {
gossipPublish: (topic: string, data: Uint8Array) => Promise<void>;
sendP2P: (peerId: string, protocol: string, data: Uint8Array) => Promise<Uint8Array>;
getConnectedCorePeers: () => string[];
/**
* Optional hosting filter. Given the target context-graph UAL, return
* the subset of currently-connected core peers whose published agent
* profile (`<https://dkg.origintrail.io/skill#contextGraphsServed>`)
* advertises hosting it.
*
* When provided, the collector prefers this filtered candidate set —
* cores that don't host the CG would otherwise have to reject the
* StorageACK request mid-stream (most often as `No data found in SWM
* graph ...`), which the publisher sees as a libp2p stream reset and
* has to retry/timeout against. Filtering up front avoids that cost.
*
* Behaviour when fewer than `requiredACKs` peers match: the collector
* logs a warning naming the CG + which connected cores are vs. aren't
* advertising it, and **falls back to the full connected-core set** so
* publishes still proceed when the hosting registry is incomplete or
* stale. This deliberately keeps the path live during discovery races,
* but makes hosting-coverage bugs (see GitHub issue #541) visible in
* the log instead of presenting as opaque ACK timeouts.
*/
getCorePeersHostingContextGraph?: (cgIdStr: string) => string[] | Promise<string[]>;
verifyIdentity?: (recoveredAddress: string, claimedIdentityId: bigint) => Promise<boolean>;
log?: (msg: string) => void;
}
Expand All @@ -32,6 +53,17 @@ export interface ACKCollectionResult {
const DEFAULT_REQUIRED_ACKS = 3;
const ACK_TIMEOUT_MS = 120_000;
const MAX_RETRIES = 3;
/**
* Hard ceiling for the optional `getCorePeersHostingContextGraph`
* lookup. The lookup runs against the local triple store BEFORE the
* `ACK_TIMEOUT_MS` budget begins, so an unbounded await here can block
* a publish indefinitely if the store is under load or the query
* implementation hangs (Codex Review on PR#556). On timeout the
* collector treats the lookup as "no hosting signal" — falling back to
* the legacy single-wave behaviour against all connected cores —
* rather than escalating into a publish failure.
*/
const HOSTING_FILTER_TIMEOUT_MS = 1_500;

/**
* ACKCollector implements V10 spec §9.0 Phase 3: collecting 3 core node
Expand Down Expand Up @@ -122,16 +154,95 @@ export class ACKCollector {
// that decode payloads as FinalizationMessages, causing decode errors.
log(`[ACKCollector] Collecting ACKs via direct P2P (merkleRoot=${ethers.hexlify(merkleRoot).slice(0, 18)}...)`);

const corePeers = this.deps.getConnectedCorePeers();
if (corePeers.length === 0) {
const allConnected = this.deps.getConnectedCorePeers();
if (allConnected.length === 0) {
throw new Error('ACK collection failed: no connected core peers');
}
if (corePeers.length < REQUIRED_ACKS) {

// Split connected cores into two waves:
// priorityPeers — advertise hosting the target CG; tried first.
// fallbackPeers — connected but don't advertise; only tried if
// the priority wave can't satisfy quorum.
//
// Cores outside the priority set are NOT a hard gate (a stale or
// missing advertisement on one peer in `priorityPeers` shouldn't
// be able to fail a publish that the rest of the connected pool
// could have satisfied — Codex Review on PR#556 flagged this).
// Wave 2 only fires when wave 1 doesn't reach quorum, so the happy
// path still avoids dialling cores that would just decline / reset
// the stream (the GitHub #541 cost).
let priorityPeers: string[] = allConnected;
let fallbackPeers: string[] = [];
if (this.deps.getCorePeersHostingContextGraph) {
let hostingPeers: string[] = [];
try {
const lookupPromise = Promise.resolve(
this.deps.getCorePeersHostingContextGraph(contextGraphIdStr),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Issue: The hosting lookup is keyed by contextGraphIdStr, but the ACK handler loads data from swmGraphId ?? contextGraphIdStr. In the remap flow, peers advertise the source SWM graph, not the target on-chain ACK domain, so this will prioritize the wrong peers or miss the hosting signal entirely. Use params.swmGraphId ?? contextGraphIdStr for the hosting-filter query.

);
// Bound the local-store lookup so a slow / hung registry query
// can't block the publish before the ACK_TIMEOUT_MS budget even
// begins (Codex Review on PR#556). On timeout we treat the
// result as "no hosting signal" — i.e. fall back to the legacy
// single-wave path against all connected cores.
let timeoutHandle: ReturnType<typeof setTimeout> | undefined;
const timeoutSentinel: unique symbol = Symbol('hosting-filter-timeout');
const timeoutPromise = new Promise<typeof timeoutSentinel>(resolve => {
timeoutHandle = setTimeout(() => resolve(timeoutSentinel), HOSTING_FILTER_TIMEOUT_MS);
});
const settled = await Promise.race([lookupPromise, timeoutPromise]);
if (timeoutHandle) clearTimeout(timeoutHandle);
if (settled === timeoutSentinel) {
log(
`[ACKCollector] hosting-filter lookup did not return within ${HOSTING_FILTER_TIMEOUT_MS}ms for "${contextGraphIdStr}"; ` +
`dialling all ${allConnected.length} connected cores in a single wave`,
);
hostingPeers = [];
} else {
hostingPeers = settled;
}
} catch (err) {
const errMsg = err instanceof Error ? err.message : String(err);
log(
`[ACKCollector] hosting-filter lookup failed for "${contextGraphIdStr}" (${errMsg}); ` +
`dialling all ${allConnected.length} connected cores in a single wave`,
);
hostingPeers = [];
}
const hostingSet = new Set(hostingPeers);
const matched = allConnected.filter(p => hostingSet.has(p));
const excluded = allConnected.filter(p => !hostingSet.has(p));

if (matched.length === 0) {
// No hosting signal at all — keep the legacy single-wave shape.
log(
`[ACKCollector] hosting filter: no connected cores advertise hosting "${contextGraphIdStr}"; ` +
`dialling all ${allConnected.length} connected cores in a single wave (expect declines from non-hosting cores).`,
);
} else {
priorityPeers = matched;
fallbackPeers = excluded;
const includedTag = matched.map(p => p.slice(-8)).join(', ');
const excludedTag = excluded.length > 0 ? excluded.map(p => p.slice(-8)).join(', ') : '<none>';
log(
`[ACKCollector] hosting filter: priority wave = ${matched.length}/${allConnected.length} cores advertising "${contextGraphIdStr}" [${includedTag}]; ` +
`fallback wave = ${excluded.length} non-advertising cores [${excludedTag}].`,
);
if (matched.length < REQUIRED_ACKS) {
log(
`[ACKCollector] WARN: only ${matched.length} connected cores advertise hosting "${contextGraphIdStr}" (need ${REQUIRED_ACKS}); ` +
`fallback wave will be dialled if priority wave can't satisfy quorum. ` +
`If "${contextGraphIdStr}" has replicationPolicy=full, the non-advertising cores have a coverage bug (see GitHub issue #541).`,
);
}
}
}

if (allConnected.length < REQUIRED_ACKS) {
throw new Error(
`ACK collection failed: need ${REQUIRED_ACKS} ACKs but only ${corePeers.length} core peers connected — quorum impossible`,
`ACK collection failed: need ${REQUIRED_ACKS} ACKs but only ${allConnected.length} core peers connected — quorum impossible`,
);
}
log(`[ACKCollector] Requesting ACKs from ${corePeers.length} core peers (need ${REQUIRED_ACKS})`);
log(`[ACKCollector] Requesting ACKs from ${allConnected.length} core peers (need ${REQUIRED_ACKS}; priority=${priorityPeers.length}, fallback=${fallbackPeers.length})`);

const ackDigest = computePublishACKDigest(
chainId,
Expand Down Expand Up @@ -202,22 +313,41 @@ export class ACKCollector {
let quorumResolve: (() => void) | undefined;
const quorumPromise = new Promise<void>(resolve => { quorumResolve = resolve; });

/**
* Dial a wave of peers in parallel, accumulating their ACKs into
* the shared `collected` slot. Stops early when the publisher has
* `REQUIRED_ACKS` distinct (peer, identity) ACKs.
*/
const dialWave = async (peers: string[]): Promise<void> => {
if (peers.length === 0) return;
const promises = peers.map(async (peerId) => {
if (collected.length >= REQUIRED_ACKS) return;
const ack = await requestACK(peerId);
if (ack && !seenPeers.has(ack.peerId) && !seenIdentityIds.has(ack.nodeIdentityId)) {
seenPeers.add(ack.peerId);
seenIdentityIds.add(ack.nodeIdentityId);
collected.push(ack);
if (collected.length >= REQUIRED_ACKS) {
quorumResolve?.();
}
}
});
await Promise.race([Promise.allSettled(promises), quorumPromise]);
};

let triedPeerCount = 0;
await Promise.race([
(async () => {
const promises = corePeers.map(async (peerId) => {
if (collected.length >= REQUIRED_ACKS) return;
const ack = await requestACK(peerId);
if (ack && !seenPeers.has(ack.peerId) && !seenIdentityIds.has(ack.nodeIdentityId)) {
seenPeers.add(ack.peerId);
seenIdentityIds.add(ack.nodeIdentityId);
collected.push(ack);
if (collected.length >= REQUIRED_ACKS) {
quorumResolve?.();
return;
}
}
});
await Promise.race([Promise.allSettled(promises), quorumPromise]);
await dialWave(priorityPeers);
triedPeerCount = priorityPeers.length;
if (collected.length < REQUIRED_ACKS && fallbackPeers.length > 0) {
log(
`[ACKCollector] Priority wave settled with ${collected.length}/${REQUIRED_ACKS} ACKs; ` +
`dialling fallback wave (${fallbackPeers.length} non-advertising core(s))`,
);
await dialWave(fallbackPeers);
triedPeerCount += fallbackPeers.length;
}
})(),
new Promise<never>((_, reject) =>
setTimeout(() => reject(new Error(`storage_ack_timeout: only ${collected.length}/${REQUIRED_ACKS} ACKs received within ${ACK_TIMEOUT_MS}ms`)),
Expand All @@ -229,7 +359,7 @@ export class ACKCollector {
if (collected.length < REQUIRED_ACKS) {
throw new Error(
`storage_ack_insufficient: got ${collected.length}/${REQUIRED_ACKS} valid ACKs. ` +
`Tried ${corePeers.length} core peers.`,
`Tried ${triedPeerCount} core peers.`,
);
}

Expand Down
Loading
Loading