Skip to content

Commit eccf084

Browse files
author
Ariel Melendez
committed
feat(cu): add data gap prevention checks and dry run for verifier PE-8738
1 parent ec9adb0 commit eccf084

File tree

2 files changed

+319
-2
lines changed

2 files changed

+319
-2
lines changed

servers/cu/scripts/run-verifier.js

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ Options:
4646
--interval <ms> Ms between cycles (default: 60000)
4747
--once Run one cycle and exit
4848
--stats Show stats and exit
49+
--dry-run Check for nonce gaps and verify without writing to DB
4950
5051
Examples:
5152
node scripts/run-verifier.js qNvAoz0TgcH7DMg8BCVn8jF32QH5L6T29VjHxhHqqGE
@@ -67,7 +68,8 @@ Examples:
6768
batchSize: 100,
6869
intervalMs: 60000,
6970
once: false,
70-
stats: false
71+
stats: false,
72+
dryRun: false
7173
}
7274

7375
for (let i = 1; i < args.length; i++) {
@@ -99,6 +101,9 @@ Examples:
99101
case '--stats':
100102
options.stats = true
101103
break
104+
case '--dry-run':
105+
options.dryRun = true
106+
break
102107
default:
103108
console.error(`Unknown option: ${args[i]}`)
104109
process.exit(1)
@@ -134,6 +139,56 @@ async function main () {
134139
return
135140
}
136141

142+
if (options.dryRun) {
143+
// Dry run: check for nonce gaps and verify without writing
144+
const verifier = new MessageVerifier({
145+
processId: options.processId,
146+
discoveryDbDir: options.discoveryDbDir,
147+
verificationDbDir: options.verificationDbDir,
148+
cacheDbPath: options.cacheDbPath,
149+
graphqlUrl: options.graphqlUrl,
150+
retryAfterMinutes: options.retryAfterMinutes,
151+
batchSize: options.batchSize
152+
})
153+
verifier.init()
154+
155+
console.log(`Dry run for process ${options.processId}`)
156+
console.log('Checking for nonce gaps between source and verification DBs...\n')
157+
158+
const gapCheck = verifier.checkNonceGap()
159+
160+
console.log('Source DB:')
161+
console.log(` Min nonce: ${gapCheck.source.minNonce ?? 'N/A'}`)
162+
console.log(` Max nonce: ${gapCheck.source.maxNonce ?? 'N/A'}`)
163+
164+
console.log('\nVerification DB:')
165+
console.log(` Min nonce: ${gapCheck.verification.minNonce ?? 'N/A'}`)
166+
console.log(` Max nonce: ${gapCheck.verification.maxNonce ?? 'N/A'}`)
167+
console.log(` Next nonce needed: ${gapCheck.verification.nextNonceNeeded}`)
168+
169+
if (gapCheck.hasGap) {
170+
console.log('\n⚠️ WARNING: Nonce gap detected!')
171+
console.log(` ${gapCheck.gapReason}`)
172+
console.log('\nDry run aborted due to gap. Verification would produce incomplete results.')
173+
verifier.close()
174+
process.exit(1)
175+
}
176+
177+
console.log('\n✓ No nonce gaps detected. Proceeding with dry-run verification...\n')
178+
179+
const result = await verifier.runDryRunVerification()
180+
181+
console.log('\nDry Run Results:')
182+
console.log(` Would sync: ${result.wouldSync} messages`)
183+
console.log(` Verified: ${result.verified}`)
184+
console.log(` Found on Arweave: ${result.found}`)
185+
console.log(` Not found: ${result.notFound}`)
186+
console.log('\nNo changes were written to the verification DB.')
187+
188+
verifier.close()
189+
return
190+
}
191+
137192
if (options.once) {
138193
// Run one cycle and exit
139194
const verifier = new MessageVerifier({

servers/cu/src/domain/message-verifier.js

Lines changed: 263 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,15 @@ export class MessageVerifier {
9292
const lastNonce = this.verificationDb.getLastSyncedNonce(this.processId)
9393

9494
// Query discovery DB for rows with nonce > lastNonce
95+
this.logger.info(`Querying discovery DB for rows with nonce > ${lastNonce}...`)
96+
const queryStart = Date.now()
97+
9598
const newRows = this.discoveryDb.query(
9699
'SELECT * FROM process_messages WHERE nonce > ? ORDER BY nonce ASC, output_message_index ASC',
97100
[lastNonce]
98101
)
102+
const queryMs = Date.now() - queryStart
103+
this.logger.info(`Discovery DB query returned ${newRows.length} rows in ${queryMs}ms`)
99104

100105
if (newRows.length === 0) {
101106
this.logger.debug('No new messages to sync from discovery DB')
@@ -121,6 +126,9 @@ export class MessageVerifier {
121126
const lastNonce = this.verificationDb.getLastSyncedNonce(this.processId)
122127

123128
// Query evaluations table for rows with Messages in output, for this process
129+
this.logger.info(`Querying cache DB for rows with nonce > ${lastNonce}...`)
130+
const queryStart = Date.now()
131+
124132
const stmt = this.cacheDb.prepare(`
125133
SELECT
126134
messageId,
@@ -135,6 +143,8 @@ export class MessageVerifier {
135143
`)
136144

137145
const rows = stmt.all(this.processId, lastNonce)
146+
const queryMs = Date.now() - queryStart
147+
this.logger.info(`Cache DB query returned ${rows.length} rows in ${queryMs}ms`)
138148

139149
if (rows.length === 0) {
140150
this.logger.debug('No new messages to sync from cache DB')
@@ -185,11 +195,246 @@ export class MessageVerifier {
185195
return verificationRows.length
186196
}
187197

198+
/**
199+
* Get nonce range from source DB (cache or discovery)
200+
*/
201+
getSourceNonceRange () {
202+
if (this.cacheDb) {
203+
// Use separate MIN/MAX queries to leverage index, avoid COUNT(*)
204+
this.logger.info('Source range: querying cache DB MIN(nonce)...')
205+
let start = Date.now()
206+
const minResult = this.cacheDb.prepare(`
207+
SELECT MIN(nonce) as minNonce
208+
FROM evaluations
209+
WHERE processId = ?
210+
AND json_array_length(json_extract(output, '$.Messages')) > 0
211+
`).get(this.processId)
212+
this.logger.info(`Source range: cache DB MIN query took ${Date.now() - start}ms`)
213+
214+
// If no min, there are no rows
215+
if (minResult.minNonce === null) {
216+
return { minNonce: null, maxNonce: null, count: 0 }
217+
}
218+
219+
this.logger.info('Source range: querying cache DB MAX(nonce)...')
220+
start = Date.now()
221+
const maxResult = this.cacheDb.prepare(`
222+
SELECT MAX(nonce) as maxNonce
223+
FROM evaluations
224+
WHERE processId = ?
225+
AND json_array_length(json_extract(output, '$.Messages')) > 0
226+
`).get(this.processId)
227+
this.logger.info(`Source range: cache DB MAX query took ${Date.now() - start}ms`)
228+
229+
return {
230+
minNonce: minResult.minNonce,
231+
maxNonce: maxResult.maxNonce,
232+
count: -1 // Unknown, but not zero
233+
}
234+
} else {
235+
// Use separate queries for discovery DB too
236+
this.logger.info('Source range: querying discovery DB MIN(nonce)...')
237+
let start = Date.now()
238+
const minResult = this.discoveryDb.query(
239+
'SELECT MIN(nonce) as minNonce FROM process_messages'
240+
)[0]
241+
this.logger.info(`Source range: discovery DB MIN query took ${Date.now() - start}ms`)
242+
243+
if (minResult.minNonce === null) {
244+
return { minNonce: null, maxNonce: null, count: 0 }
245+
}
246+
247+
this.logger.info('Source range: querying discovery DB MAX(nonce)...')
248+
start = Date.now()
249+
const maxResult = this.discoveryDb.query(
250+
'SELECT MAX(nonce) as maxNonce FROM process_messages'
251+
)[0]
252+
this.logger.info(`Source range: discovery DB MAX query took ${Date.now() - start}ms`)
253+
254+
return {
255+
minNonce: minResult.minNonce,
256+
maxNonce: maxResult.maxNonce,
257+
count: -1 // Unknown, but not zero
258+
}
259+
}
260+
}
261+
262+
/**
263+
* Check for nonce gaps between source and verification DBs
264+
* Returns gap info and whether it's safe to proceed
265+
*/
266+
checkNonceGap () {
267+
this.logger.info('Gap check: querying source DB nonce range...')
268+
let start = Date.now()
269+
const sourceRange = this.getSourceNonceRange()
270+
this.logger.info(`Gap check: source DB query took ${Date.now() - start}ms`)
271+
272+
this.logger.info('Gap check: getting last synced nonce...')
273+
start = Date.now()
274+
const lastSyncedNonce = this.verificationDb.getLastSyncedNonce(this.processId)
275+
this.logger.info(`Gap check: last synced nonce query took ${Date.now() - start}ms`)
276+
277+
// Get min/max nonce from verification DB (separate queries to use index efficiently)
278+
this.logger.info('Gap check: querying verification DB MIN(nonce)...')
279+
start = Date.now()
280+
const verificationMin = this.verificationDb.query(
281+
'SELECT MIN(nonce) as minNonce FROM verification_messages'
282+
)[0] || { minNonce: null }
283+
this.logger.info(`Gap check: verification MIN query took ${Date.now() - start}ms`)
284+
285+
this.logger.info('Gap check: querying verification DB MAX(nonce)...')
286+
start = Date.now()
287+
const verificationMax = this.verificationDb.query(
288+
'SELECT MAX(nonce) as maxNonce FROM verification_messages'
289+
)[0] || { maxNonce: null }
290+
this.logger.info(`Gap check: verification MAX query took ${Date.now() - start}ms`)
291+
292+
const verificationHasRows = verificationMax.maxNonce !== null
293+
const nextNonceNeeded = lastSyncedNonce + 1
294+
295+
const result = {
296+
source: {
297+
minNonce: sourceRange.minNonce,
298+
maxNonce: sourceRange.maxNonce,
299+
count: sourceRange.count
300+
},
301+
verification: {
302+
minNonce: verificationMin.minNonce,
303+
maxNonce: verificationMax.maxNonce,
304+
nextNonceNeeded
305+
},
306+
hasGap: false,
307+
gapReason: null
308+
}
309+
310+
// Check for gaps
311+
if (sourceRange.minNonce === null) {
312+
result.hasGap = true
313+
result.gapReason = 'Source DB has no messages with output'
314+
} else if (!verificationHasRows) {
315+
// First sync - no gap issue, source just needs to start from beginning
316+
if (sourceRange.minNonce > 0) {
317+
// This might be okay if process started at nonce > 0, but warn anyway
318+
result.hasGap = false
319+
}
320+
} else if (nextNonceNeeded < sourceRange.minNonce) {
321+
// Verification DB needs nonces that source DB doesn't have (source starts too late)
322+
result.hasGap = true
323+
result.gapReason = `Verification needs nonce ${nextNonceNeeded} but source DB starts at nonce ${sourceRange.minNonce}`
324+
} else if (verificationMin.minNonce !== null && verificationMin.minNonce > sourceRange.maxNonce) {
325+
// Verification DB is ahead of source (shouldn't happen normally)
326+
result.hasGap = true
327+
result.gapReason = `Verification DB min nonce (${verificationMin.minNonce}) is higher than source DB max nonce (${sourceRange.maxNonce})`
328+
}
329+
330+
return result
331+
}
332+
333+
/**
334+
* Get rows that would be synced from source DB (without writing)
335+
*/
336+
getRowsToSync () {
337+
const lastNonce = this.verificationDb.getLastSyncedNonce(this.processId)
338+
339+
if (this.cacheDb) {
340+
const stmt = this.cacheDb.prepare(`
341+
SELECT
342+
messageId,
343+
nonce,
344+
timestamp,
345+
json_extract(output, '$.Messages') as messages
346+
FROM evaluations
347+
WHERE processId = ?
348+
AND nonce > ?
349+
AND json_array_length(json_extract(output, '$.Messages')) > 0
350+
ORDER BY nonce ASC
351+
`)
352+
353+
const rows = stmt.all(this.processId, lastNonce)
354+
const verificationRows = []
355+
356+
for (const row of rows) {
357+
const messages = JSON.parse(row.messages)
358+
for (let i = 0; i < messages.length; i++) {
359+
const msg = messages[i]
360+
const tags = msg.Tags || []
361+
const referenceTag = tags.find(t => t.name === 'Reference')
362+
const actionTag = tags.find(t => t.name === 'Action')
363+
364+
if (!referenceTag) continue
365+
366+
verificationRows.push({
367+
nonce: row.nonce,
368+
input_message_id: row.messageId,
369+
output_message_reference: referenceTag.value,
370+
output_message_target: msg.Target,
371+
output_message_action: actionTag ? actionTag.value : null,
372+
output_message_index: i,
373+
created_at: row.timestamp
374+
})
375+
}
376+
}
377+
378+
return verificationRows
379+
} else {
380+
return this.discoveryDb.query(
381+
'SELECT * FROM process_messages WHERE nonce > ? ORDER BY nonce ASC, output_message_index ASC',
382+
[lastNonce]
383+
)
384+
}
385+
}
386+
387+
/**
388+
* Run verification without writing to DB (dry run mode)
389+
*/
390+
async runDryRunVerification () {
391+
const rowsToSync = this.getRowsToSync()
392+
393+
let verified = 0
394+
let found = 0
395+
let notFound = 0
396+
397+
// Process in batches
398+
for (let i = 0; i < rowsToSync.length; i += this.batchSize) {
399+
const batch = rowsToSync.slice(i, i + this.batchSize)
400+
const foundMessages = await this.findMessagesOnArweave(batch)
401+
402+
let batchFound = 0
403+
let batchNotFound = 0
404+
405+
for (const row of batch) {
406+
const messageId = foundMessages.get(row.output_message_reference)
407+
if (messageId) {
408+
found++
409+
batchFound++
410+
} else {
411+
notFound++
412+
batchNotFound++
413+
}
414+
verified++
415+
}
416+
417+
this.logger.info(`Dry run batch ${Math.floor(i / this.batchSize) + 1}: verified=${batch.length}, found=${batchFound}, notFound=${batchNotFound}`)
418+
}
419+
420+
return {
421+
wouldSync: rowsToSync.length,
422+
verified,
423+
found,
424+
notFound
425+
}
426+
}
427+
188428
/**
189429
* Get pending rows that need verification
190430
*/
191431
getRowsToVerify () {
192-
return this.verificationDb.getRowsToVerify(this.retryAfterMs, this.batchSize)
432+
this.logger.info('Querying verification DB for rows to verify...')
433+
const queryStart = Date.now()
434+
const rows = this.verificationDb.getRowsToVerify(this.retryAfterMs, this.batchSize)
435+
const queryMs = Date.now() - queryStart
436+
this.logger.info(`Verification DB query returned ${rows.length} rows in ${queryMs}ms`)
437+
return rows
193438
}
194439

195440
/**
@@ -442,6 +687,23 @@ export async function runVerifier ({
442687
console.log(`Batch size: ${options.batchSize || 100}`)
443688
console.log(`Cycle interval: ${intervalMs}ms`)
444689

690+
// Check for nonce gaps at startup
691+
console.log('\nChecking for nonce gaps between source and verification DBs...')
692+
const gapCheck = verifier.checkNonceGap()
693+
694+
console.log(`Source DB: minNonce=${gapCheck.source.minNonce ?? 'N/A'}, maxNonce=${gapCheck.source.maxNonce ?? 'N/A'}`)
695+
console.log(`Verification DB: minNonce=${gapCheck.verification.minNonce ?? 'N/A'}, maxNonce=${gapCheck.verification.maxNonce ?? 'N/A'}, nextNonceNeeded=${gapCheck.verification.nextNonceNeeded}`)
696+
697+
if (gapCheck.hasGap) {
698+
console.error('\n⚠️ ERROR: Nonce gap detected!')
699+
console.error(` ${gapCheck.gapReason}`)
700+
console.error('\nTerminating. Please ensure source DB has continuous nonce coverage.')
701+
verifier.close()
702+
process.exit(1)
703+
}
704+
705+
console.log('✓ No nonce gaps detected.\n')
706+
445707
let running = true
446708

447709
// Handle shutdown

0 commit comments

Comments
 (0)