Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 15 additions & 6 deletions packages/indexer-agent/src/commands/start.ts
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,17 @@ export const start = {
required: false,
group: 'Indexing Fees ("DIPs")',
})
.option('dips-on-chain-accept-delay', {
description:
'How long after the off-chain accept to wait before the first on-chain ' +
'acceptIndexingAgreement attempt, in seconds. Gives the payer time to land ' +
"the offer() tx so fewer attempts hit 'offer not yet present'. The deployment " +
'starts syncing immediately regardless of this delay.',
type: 'number',
default: 5,
required: false,
group: 'Indexing Fees ("DIPs")',
})
.check(argv => {
if (
!argv['network-subgraph-endpoint'] &&
Expand Down Expand Up @@ -469,6 +480,7 @@ export async function createNetworkSpecification(
dipsCollectionTarget: argv.dipsCollectionTarget,
dipsCollectionSlippage: argv.dipsCollectionSlippage,
dipsAcceptanceInterval: argv.dipsAcceptanceInterval,
dipsOnChainAcceptDelay: argv.dipsOnChainAcceptDelay,
}

const transactionMonitoring = {
Expand Down Expand Up @@ -566,10 +578,8 @@ export async function run(
logger: Logger,
): Promise<void> {
await common_init(logger)
// --------------------------------------------------------------------------------
// * Configure event listeners for unhandled promise rejections and uncaught
// Configure event listeners for unhandled promise rejections and uncaught
// exceptions.
// --------------------------------------------------------------------------------
process.on('unhandledRejection', err => {
logger.warn(`Unhandled promise rejection`, {
err: indexerError(IndexerErrorCode.IE035, err),
Expand Down Expand Up @@ -786,9 +796,8 @@ export async function run(
}

// Review CLI arguments, emit non-interrupting warnings about expected behavior.
// Perform this check immediately after parsing the command line arguments.
// Ideally, this check could be made inside yargs.check, but we can't access a Logger
// instance in that context.
// Runs right after parsing; can't live in yargs.check because a Logger instance
// isn't accessible there.
export function reviewArgumentsForWarnings(argv: AgentOptions, logger: Logger) {
const {
gasIncreaseTimeout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ function createMockNetwork() {
enableDips: true,
dipsAllocationAmount: 0n,
defaultAllocationAmount: 10000000000000000000n, // 10 GRT
// 0 disables the accept-delay gate so existing tests reach the accept path
// immediately; the delay-specific tests override this.
dipsOnChainAcceptDelay: 0,
},
networkIdentifier: 'eip155:1337',
},
Expand Down Expand Up @@ -758,4 +761,160 @@ describe('DipsManager.acceptPendingProposals', () => {
expect(consumer.markRejected).not.toHaveBeenCalled()
})
})

describe('start syncing on off-chain accept, defer on-chain accept', () => {
test('upserts the DIPS rule before any offer is read', async () => {
// Within the delay window the on-chain accept hasn't run, yet the rule
// (graphNode.ensure path) must already exist so syncing starts.
const proposal = createMockProposal({ createdAt: new Date() })
const consumer = createMockConsumer([proposal])
const models = createMockModels()
const network = createMockNetwork()
;(
network.specification.indexerOptions as { dipsOnChainAcceptDelay: number }
).dipsOnChainAcceptDelay = 60
const offerQuery = network.indexingPaymentsSubgraph!.query as jest.Mock
const graphNode = { ensure: jest.fn().mockResolvedValue(undefined) }
const dm = new DipsManager(
logger,
models,
network,
// eslint-disable-next-line @typescript-eslint/no-explicit-any
graphNode as any,
createMockParent(),
// eslint-disable-next-line @typescript-eslint/no-explicit-any
{} as any,
)
// eslint-disable-next-line @typescript-eslint/no-explicit-any
;(dm as any).pendingRcaConsumer = consumer

await dm.acceptPendingProposals([])

expect(models.IndexingRule.upsert).toHaveBeenCalled()
expect(graphNode.ensure).toHaveBeenCalled()
// Deferred: offer not read and no on-chain attempt yet.
expect(offerQuery).not.toHaveBeenCalled()
expect(network.transactionManager.executeTransaction).not.toHaveBeenCalled()
})

test('does not attempt on-chain accept while within the delay window', async () => {
const proposal = createMockProposal({ createdAt: new Date() })
const allocation = createMockAllocation()
const consumer = createMockConsumer([proposal])
const models = createMockModels()
const network = createMockNetwork()
;(
network.specification.indexerOptions as { dipsOnChainAcceptDelay: number }
).dipsOnChainAcceptDelay = 60
const dm = createDipsManager(network, models, consumer)

await dm.acceptPendingProposals([allocation])

expect(network.transactionManager.executeTransaction).not.toHaveBeenCalled()
expect(consumer.markAccepted).not.toHaveBeenCalled()
expect(consumer.markRejected).not.toHaveBeenCalled()
})

test('attempts on-chain accept once older than the delay and offer is present', async () => {
const createdAt = new Date(Date.now() - 120 * 1000) // 2 minutes ago
const proposal = createMockProposal({ createdAt })
const allocation = createMockAllocation()
const consumer = createMockConsumer([proposal])
const models = createMockModels()
const network = createMockNetwork()
;(
network.specification.indexerOptions as { dipsOnChainAcceptDelay: number }
).dipsOnChainAcceptDelay = 60
;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({
hash: '0xtx',
status: 1,
})
const dm = createDipsManager(network, models, consumer)

await dm.acceptPendingProposals([allocation])

expect(network.transactionManager.executeTransaction).toHaveBeenCalled()
expect(consumer.markAccepted).toHaveBeenCalledWith(proposal.id)
})

test('keeps the row pending (no accept/reject) when offer is not yet present', async () => {
const createdAt = new Date(Date.now() - 120 * 1000)
const proposal = createMockProposal({ createdAt })
const allocation = createMockAllocation()
const consumer = createMockConsumer([proposal])
const models = createMockModels()
const network = createMockNetwork()
;(
network.specification.indexerOptions as { dipsOnChainAcceptDelay: number }
).dipsOnChainAcceptDelay = 60
;(network.indexingPaymentsSubgraph!.query as jest.Mock).mockResolvedValue({
data: { offer: null },
})
const dm = createDipsManager(network, models, consumer)

await dm.acceptPendingProposals([allocation])

expect(consumer.markAccepted).not.toHaveBeenCalled()
expect(consumer.markRejected).not.toHaveBeenCalled()
})

test('runs the deadline check before the delay gate', async () => {
// Deadline already passed: must be rejected 'deadline_expired' with its rule
// cleaned up, never held back by the delay gate.
const proposal = createMockProposal({
createdAt: new Date(),
deadline: BigInt(Math.floor(Date.now() / 1000) - 10),
})
const consumer = createMockConsumer([proposal])
;(consumer.getPendingProposalsForDeployment as jest.Mock).mockResolvedValue([])
const mockRule = { id: 7 }
const models = createMockModels()
;(models.IndexingRule.findOne as jest.Mock).mockResolvedValue(mockRule)
const network = createMockNetwork()
;(
network.specification.indexerOptions as { dipsOnChainAcceptDelay: number }
).dipsOnChainAcceptDelay = 3600
const dm = createDipsManager(network, models, consumer)

await dm.acceptPendingProposals([])

expect(consumer.markRejected).toHaveBeenCalledWith(proposal.id, 'deadline_expired')
expect(models.IndexingRule.destroy).toHaveBeenCalledWith({ where: { id: 7 } })
expect(network.transactionManager.executeTransaction).not.toHaveBeenCalled()
})

test('accepting twice on the same pending row does not double-accept', async () => {
// Restart idempotency: once markAccepted moves the row out of pending, a
// second pass finds nothing to accept and sends no second transaction.
const createdAt = new Date(Date.now() - 120 * 1000)
const proposal = createMockProposal({ createdAt })
const allocation = createMockAllocation()
// First call sees the pending proposal; after acceptance the row is gone.
const consumer = {
getPendingProposals: jest
.fn()
.mockResolvedValueOnce([proposal])
.mockResolvedValue([]),
getPendingProposalsForDeployment: jest.fn().mockResolvedValue([]),
markAccepted: jest.fn().mockResolvedValue(undefined),
markRejected: jest.fn().mockResolvedValue(undefined),
} as unknown as PendingRcaConsumer
const models = createMockModels()
const network = createMockNetwork()
;(
network.specification.indexerOptions as { dipsOnChainAcceptDelay: number }
).dipsOnChainAcceptDelay = 60
;(network.transactionManager.executeTransaction as jest.Mock).mockResolvedValue({
hash: '0xtx',
status: 1,
})
const dm = createDipsManager(network, models, consumer)

await dm.acceptPendingProposals([allocation])
await dm.acceptPendingProposals([allocation])

expect(consumer.markAccepted).toHaveBeenCalledTimes(1)
expect(network.transactionManager.executeTransaction).toHaveBeenCalledTimes(1)
})
})
})
30 changes: 26 additions & 4 deletions packages/indexer-common/src/indexing-fees/dips.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ import { OfferVerifier } from './offer-verifier'

// POIs are computed against a recent-but-not-tip block to avoid reorg edge cases.
const RECENT_BLOCK_OFFSET = 10
// Margin between the clamped accept-delay and the deadline, so the first on-chain
// attempt always lands with room to spare.
const DIPS_ACCEPT_DEADLINE_MARGIN_SECONDS = 5n
// Per-tick parallelism cap across distinct deployments. acceptPendingProposals
// dedupes to one proposal per deployment, and the transaction manager serialises
// nonce assignment, so concurrent processProposal calls are safe; the cap keeps
Expand Down Expand Up @@ -491,10 +494,9 @@ export class DipsManager {
return
}

// Create the dips rule eagerly here rather than leaving it to the reconcile
// loop: the accept tx can confirm and clear the pending row before the next
// reconcile tick, which would leave the rule uncreated and graph-node never
// told to deploy the subgraph.
// Create the dips rule as soon as the loop sees the pending row, independent
// of the on-chain offer, so graph-node starts syncing on the off-chain accept.
// ensureAgreementRules also creates it; the redundancy is deliberate (backstop).
const tRule = process.hrtime.bigint()
const allDeploymentRules = await this.models.IndexingRule.findAll({
where: { identifierType: SubgraphIdentifierType.DEPLOYMENT },
Expand All @@ -521,6 +523,26 @@ export class DipsManager {
})
phases.ruleMs = elapsedMs(tRule)

// Defer the first on-chain accept by dipsOnChainAcceptDelay seconds, anchored
// on created_at (the off-chain-accept time), to let the payer's offer() tx land
// and cut wasted attempts. Clamped below the deadline so we never sit past it.
const createdAtSeconds = BigInt(Math.floor(proposal.createdAt.getTime() / 1000))
const configuredDelay = BigInt(
Math.floor(this.network.specification.indexerOptions.dipsOnChainAcceptDelay),
)
const maxDelay =
proposal.deadline - createdAtSeconds - DIPS_ACCEPT_DEADLINE_MARGIN_SECONDS
const effectiveDelay = maxDelay < configuredDelay ? maxDelay : configuredDelay
if (effectiveDelay > 0n && now - createdAtSeconds < effectiveDelay) {
this.logger.debug('Within on-chain accept delay window; leaving proposal pending', {
proposalId: proposal.id,
ageSeconds: (now - createdAtSeconds).toString(),
effectiveDelaySeconds: effectiveDelay.toString(),
})
logSummary('waiting_for_accept_delay')
return
}

// Gate accept on the on-chain offer. Pre-flight the RCA offer via the
// indexing-payments-subgraph so acceptIndexingAgreement doesn't revert when
// dipper's offer() tx hasn't landed yet, and reject outright if the on-chain
Expand Down
1 change: 1 addition & 0 deletions packages/indexer-common/src/network-specification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ export const IndexerOptions = z
dipsCollectionTarget: positiveNumber().min(1).max(90).default(50),
dipsCollectionSlippage: positiveNumber().min(0).max(100).default(1),
dipsAcceptanceInterval: positiveNumber().default(5),
dipsOnChainAcceptDelay: positiveNumber().default(5),
})
.strict()
export type IndexerOptions = z.infer<typeof IndexerOptions>
Expand Down
Loading