diff --git a/fly.toml b/fly.toml index 1988b0ea..06a84f1d 100644 --- a/fly.toml +++ b/fly.toml @@ -6,3 +6,7 @@ primary_region = "cdg" [deploy] strategy = "rolling" + +[mount] + source = "spark-evaluate-data" + destination = "/var/lib/spark-evaluate/" diff --git a/index.js b/index.js index 1b7e90f1..c68767af 100644 --- a/index.js +++ b/index.js @@ -5,6 +5,7 @@ import { evaluate } from './lib/evaluate.js' import { RoundData } from './lib/round.js' import { refreshDatabase } from './lib/platform-stats.js' import timers from 'node:timers/promises' +import { recoverRound, clearRoundBuffer } from './lib/round-buffer.js' // Tweak this value to improve the chances of the data being available const PREPROCESS_DELAY = 60_000 @@ -30,6 +31,13 @@ export const startEvaluate = async ({ const roundsSeen = [] let lastNewEventSeenAt = null + try { + rounds.current = await recoverRound() + } catch (err) { + console.error('CANNOT RECOVER ROUND:', err) + Sentry.captureException(err) + } + const onMeasurementsAdded = async (cid, _roundIndex) => { const roundIndex = BigInt(_roundIndex) if (cidsSeen.includes(cid)) return @@ -89,6 +97,13 @@ export const startEvaluate = async ({ console.log('Event: RoundStart', { roundIndex }) + try { + await clearRoundBuffer() + } catch (err) { + console.error('CANNOT CLEAR ROUND BUFFER:', err) + Sentry.captureException(err) + } + if (!rounds.current) { console.error('No current round data available, skipping evaluation') return diff --git a/lib/preprocess.js b/lib/preprocess.js index 051664b6..a08bd633 100644 --- a/lib/preprocess.js +++ b/lib/preprocess.js @@ -6,6 +6,7 @@ import { validateBlock } from '@web3-storage/car-block-validator' import { recursive as exporter } from 'ipfs-unixfs-exporter' import createDebug from 'debug' import pRetry from 'p-retry' +import { appendToRoundBuffer } from './round-buffer.js' const debug = createDebug('spark:preprocess') @@ -116,6 +117,7 @@ export const preprocess = async ({ logger.log('Retrieval Success Rate: %s%s (%s of %s)', Math.round(100 * okCount / total), '%', okCount, total) round.measurements.push(...validMeasurements) + await appendToRoundBuffer(validMeasurements) recordTelemetry('preprocess', point => { point.intField('round_index', roundIndex) diff --git a/lib/round-buffer.js b/lib/round-buffer.js new file mode 100644 index 00000000..d3eea5a7 --- /dev/null +++ b/lib/round-buffer.js @@ -0,0 +1,44 @@ +import fs from 'node:fs/promises' +import { RoundData } from './round.js' + +const ROUND_BUFFER_PATH = '/var/lib/spark-evaluate/round-buffer.ndjson' + +// TODO: Handle when it's from the wrong round +export const recoverRound = async () => { + let roundBuffer + try { + roundBuffer = await fs.readFile(ROUND_BUFFER_PATH, 'utf8') + } catch (err) { + if (err.code !== 'ENOENT') { + throw err + } + } + if (roundBuffer) { + const lines = roundBuffer.split('\n').filter(Boolean) + if (lines.length > 1) { + const round = new RoundData(JSON.parse(lines[0])) + for (const line of lines.slice(1)) { + round.measurements.push(JSON.parse(line)) + } + return round + } + } +} + +export const appendToRoundBuffer = async validMeasurements => { + await fs.appendFile( + ROUND_BUFFER_PATH, + validMeasurements.map(m => JSON.stringify(m)).join('\n') + '\n' + ) +} + +// TODO: Write round index as first line +export const clearRoundBuffer = async () => { + try { + await fs.writeFile(ROUND_BUFFER_PATH, '') + } catch (err) { + if (err.code !== 'ENOENT') { + throw err + } + } +}