Skip to content

Meridian #226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions bin/spark.js → bin/meridian.js
Original file line number Diff line number Diff line change
@@ -62,8 +62,10 @@ await migrate(client)
const getCurrentRound = await createRoundGetter(client)

const round = getCurrentRound()
assert(!!round, 'cannot obtain the current Spark round number')
console.log('SPARK round number at service startup:', round.sparkRoundNumber)
assert(!!round, 'cannot obtain the current module round numbers')
for (const [moduleId, moduleRoundNumber] of round.moduleRoundNumbers.entries()) {
console.log('%s round number at service startup: %s', moduleId, moduleRoundNumber)
}

const logger = {
error: console.error,
141 changes: 34 additions & 107 deletions index.js
Original file line number Diff line number Diff line change
@@ -3,8 +3,13 @@ import Sentry from '@sentry/node'
import getRawBody from 'raw-body'
import assert from 'http-assert'
import { validate } from './lib/validate.js'
import { mapRequestToInetGroup } from './lib/inet-grouping.js'
import { satisfies } from 'compare-versions'
import * as spark from './lib/spark.js'
import * as voyager from './lib/voyager.js'

const moduleImplementations = {
0: spark,
1: voyager
}

const handler = async (req, res, client, getCurrentRound, domain) => {
if (req.headers.host.split(':')[0] !== domain) {
@@ -22,8 +27,10 @@ const handler = async (req, res, client, getCurrentRound, domain) => {
} else if (segs[0] === 'measurements' && req.method === 'GET') {
await getMeasurement(req, res, client, Number(segs[1]))
} else if (segs[0] === 'rounds' && segs[1] === 'meridian' && req.method === 'GET') {
// TODO: Add moduleId
await getMeridianRoundDetails(req, res, client, segs[2], segs[3])
} else if (segs[0] === 'rounds' && req.method === 'GET') {
// TODO: Add moduleId
await getRoundDetails(req, res, client, getCurrentRound, segs[1])
} else if (segs[0] === 'inspect-request' && req.method === 'GET') {
await inspectRequest(req, res)
@@ -33,122 +40,57 @@ const handler = async (req, res, client, getCurrentRound, domain) => {
}

const createMeasurement = async (req, res, client, getCurrentRound) => {
const { sparkRoundNumber } = getCurrentRound()
const body = await getRawBody(req, { limit: '100kb' })
const measurement = JSON.parse(body)
validate(measurement, 'sparkVersion', { type: 'string', required: false })

validate(measurement, 'zinniaVersion', { type: 'string', required: false })
assert(
typeof measurement.sparkVersion === 'string' && satisfies(measurement.sparkVersion, '>=1.9.0'),
410, 'OUTDATED CLIENT'
)

// Backwards-compatibility with older clients sending walletAddress instead of participantAddress
// We can remove this after enough SPARK clients are running the new version (mid-October 2023)
if (!('participantAddress' in measurement) && ('walletAddress' in measurement)) {
validate(measurement, 'walletAddress', { type: 'string', required: true })
measurement.participantAddress = measurement.walletAddress
delete measurement.walletAddress
}

validate(measurement, 'cid', { type: 'string', required: true })
validate(measurement, 'providerAddress', { type: 'string', required: true })
validate(measurement, 'protocol', { type: 'string', required: true })
validate(measurement, 'participantAddress', { type: 'string', required: true })
validate(measurement, 'timeout', { type: 'boolean', required: false })
validate(measurement, 'startAt', { type: 'date', required: true })
validate(measurement, 'statusCode', { type: 'number', required: false })
validate(measurement, 'firstByteAt', { type: 'date', required: false })
validate(measurement, 'endAt', { type: 'date', required: false })
validate(measurement, 'byteLength', { type: 'number', required: false })
validate(measurement, 'attestation', { type: 'string', required: false })
validate(measurement, 'carTooLarge', { type: 'boolean', required: false })
validate(measurement, 'carChecksum', { type: 'string', required: false })
validate(measurement, 'indexerResult', { type: 'string', required: false })
validate(measurement.moduleId, { type: 'number', required: false })

const moduleId = measurement.moduleId || 0
const moduleImplementation = moduleImplementations[moduleId]
assert(moduleImplementation, `Unknown moduleId: ${moduleId}`)

const inetGroup = await mapRequestToInetGroup(client, req)
moduleImplementation.validateMeasurement(measurement)

const { rows } = await client.query(`
INSERT INTO measurements (
spark_version,
zinnia_version,
cid,
provider_address,
protocol,
participant_address,
timeout,
start_at,
status_code,
first_byte_at,
end_at,
byte_length,
attestation,
inet_group,
car_too_large,
car_checksum,
indexer_result,
completed_at_round
)
VALUES (
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18
)
RETURNING id
`, [
measurement.sparkVersion,
measurement.zinniaVersion,
measurement.cid,
measurement.providerAddress,
measurement.protocol,
measurement.participantAddress,
measurement.timeout || false,
parseOptionalDate(measurement.startAt),
measurement.statusCode,
parseOptionalDate(measurement.firstByteAt),
parseOptionalDate(measurement.endAt),
measurement.byteLength,
measurement.attestation,
inetGroup,
measurement.carTooLarge ?? false,
measurement.carChecksum,
measurement.indexerResult,
sparkRoundNumber
INSERT INTO measurements (module_id, data, completed_at_round)
VALUES ($1, $2, $3)
RETURNING id
`, [
moduleId,
JSON.stringify(moduleImplementation.sanitizeMeasurement(measurement)),
getCurrentRound().moduleRoundNumbers.get(moduleId)
])

json(res, { id: rows[0].id })
}

const getMeasurement = async (req, res, client, measurementId) => {
assert(!Number.isNaN(measurementId), 400, 'Invalid RetrievalResult ID')
const { rows: [resultRow] } = await client.query(`
SELECT *
FROM measurements
WHERE id = $1
`, [
measurementId
])
const { rows: [resultRow] } = await client.query(
`SELECT module_id, data, completed_at_round FROM measurements WHERE id = $1`,
[measurementId]
)
assert(resultRow, 404, 'Measurement Not Found')
json(res, {
id: resultRow.id,
cid: resultRow.cid,
providerAddress: resultRow.provider_address,
protocol: resultRow.protocol,
sparkVersion: resultRow.spark_version,
zinniaVersion: resultRow.zinnia_version,
createdAt: resultRow.created_at,
finishedAt: resultRow.finished_at,
timeout: resultRow.timeout,
startAt: resultRow.start_at,
statusCode: resultRow.status_code,
firstByteAt: resultRow.first_byte_at,
endAt: resultRow.end_at,
byteLength: resultRow.byte_length,
carTooLarge: resultRow.car_too_large,
attestation: resultRow.attestation
...JSON.parse(resultRow.data),
id: measurementId,
moduleId: resultRow.module_id,
moduleRound: resultRow.completed_at_round
})
}

const getRoundDetails = async (req, res, client, getCurrentRound, roundParam) => {
const getRoundDetails = async (req, res, client, getCurrentRound, roundParam, moduleId) => {
if (roundParam === 'current') {
const { meridianContractAddress, meridianRoundIndex } = getCurrentRound()
const { meridianContractAddresses, meridianRoundIndexes } = getCurrentRound()
const addr = encodeURIComponent(meridianContractAddress)
const idx = encodeURIComponent(meridianRoundIndex)
const location = `/rounds/meridian/${addr}/${idx}`
@@ -291,18 +233,3 @@ export const createHandler = async ({
})
}
}

/**
* Parse a date string field that may be `undefined` or `null`.
*
* - undefined -> undefined
* - null -> undefined
* - "iso-date-string" -> new Date("iso-date-string")
*
* @param {string | null | undefined} str
* @returns {Date | undefined}
*/
const parseOptionalDate = (str) => {
if (str === undefined || str === null) return undefined
return new Date(str)
}
6 changes: 3 additions & 3 deletions lib/ie-contract.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { ethers } from 'ethers'
import { IE_CONTRACT_ABI, IE_CONTRACT_ADDRESS, RPC_URL, GLIF_TOKEN } from '../spark-publish/ie-contract-config.js'
import { IE_CONTRACT_ABI, RPC_URL, GLIF_TOKEN } from '../spark-publish/ie-contract-config.js'

const provider = new ethers.providers.JsonRpcProvider({
url: RPC_URL,
@@ -11,8 +11,8 @@ const provider = new ethers.providers.JsonRpcProvider({
// Uncomment for troubleshooting
// provider.on('debug', d => console.log('[ethers:debug] %s\nrequest: %o\nresponse: %o', d.action, d.request, d.response))

export const createMeridianContract = async () => new ethers.Contract(
IE_CONTRACT_ADDRESS,
export const createMeridianContract = address => new ethers.Contract(
address,
IE_CONTRACT_ABI,
provider
)
219 changes: 118 additions & 101 deletions lib/round-tracker.js
Original file line number Diff line number Diff line change
@@ -1,226 +1,243 @@
import Sentry from '@sentry/node'
import { createMeridianContract } from './ie-contract.js'

// The number of tasks per round is proportionate to the SPARK round length - longer rounds require
// The number of tasks per round is proportionate to the module round length - longer rounds require
// more tasks per round.
//
// See https://www.notion.so/pl-strflt/SPARK-tasking-v2-604e26d57f6b4892946525bcb3a77104?pvs=4#ded1cd98c2664a2289453d38e2715643
// See https://www.notion.so/pl-strflt/module-tasking-v2-604e26d57f6b4892946525bcb3a77104?pvs=4#ded1cd98c2664a2289453d38e2715643
// for more details, this constant represents TC (tasks per committee).
//
// We will need to tweak this value based on measurements; that's why I put it here as a constant.
export const TASKS_PER_ROUND = 1000

// How many tasks is each SPARK checker node expected to complete every round (at most).
// How many tasks is each module node expected to complete every round (at most).
export const MAX_TASKS_PER_NODE = 15

/**
* @param {import('pg').Pool} pgPool
* @returns {() => {
* sparkRoundNumber: bigint;
* meridianContractAddress: string;
* meridianRoundIndex: bigint;
* moduleRoundNumberes: Map<string, bigint>;
* meridianContractAddress: Map<string, string>;
* meridianRoundIndex: Map<string, bigint>;
* }}
*/
export async function createRoundGetter (pgPool) {
const contract = await createMeridianContract()

let sparkRoundNumber, meridianContractAddress, meridianRoundIndex

const updateSparkRound = async (newRoundIndex) => {
meridianRoundIndex = BigInt(newRoundIndex)
meridianContractAddress = contract.address

const pgClient = await pgPool.connect()
try {
await pgClient.query('BEGIN')
sparkRoundNumber = await mapCurrentMeridianRoundToSparkRound({
meridianContractAddress,
meridianRoundIndex,
pgClient
})
await pgClient.query('COMMIT')
console.log('SPARK round started: %s', sparkRoundNumber)
} catch (err) {
await pgClient.query('ROLLBACK')
} finally {
pgClient.release()
const { rows: modules } = await client.query(
'SELECT id, contract_address AS contractAddress FROM modules'
)
const moduleRoundNumbers = new Map()
const meridianContractAddresses = new Map()
const meridianRoundIndexes = new Map()

for (const mod of modules) {
meridianContractAddresses.set(mod.id, mod.contractAddress)
const contract = createMeridianContract(mod.contractAddress)

const updateModuleRound = async (newRoundIndex) => {
meridianRoundIndexes.set(mod.id, BigInt(newRoundIndex))

const pgClient = await pgPool.connect()
try {
await pgClient.query('BEGIN')
moduleRoundNumbers.set(mod.id, await mapCurrentMeridianRoundToModuleRound({
moduleId: mod.id,
moduleContractAddress: mod.contractAddress,
meridianRoundIndex: meridianRoundIndexes.get(mod.id),
pgClient
}))
await pgClient.query('COMMIT')
console.log('%s round started: %s', mod.name, moduleRoundNumber)
} catch (err) {
await pgClient.query('ROLLBACK')
} finally {
pgClient.release()
}
}
}

contract.on('RoundStart', (newRoundIndex) => {
updateSparkRound(newRoundIndex).catch(err => {
console.error('Cannot handle RoundStart:', err)
Sentry.captureException(err)
contract.on('RoundStart', (newRoundIndex) => {
updateModuleRound(newRoundIndex).catch(err => {
console.error('Cannot handle RoundStart:', err)
Sentry.captureException(err)
})
})
})

await updateSparkRound(await contract.currentRoundIndex())

await updateModuleRound(await contract.currentRoundIndex())
}

return () => ({
sparkRoundNumber,
meridianContractAddress,
meridianRoundIndex
moduleRoundNumbers,
meridianContractAddresses,
meridianRoundIndexes
})
}

/*
There are three cases we need to handle:
1. Business as usual - the IE contract advanced the round by one
2. Fresh start, e.g. a new spark-api instance is deployed, or we deploy this PR to an existing instance.
2. Fresh start, e.g. a new meridian-api instance is deployed, or we deploy this PR to an existing instance.
3. Upgrade of the IE contract
For each IE version (defined as the smart contract address), we are keeping track of three fields:
- `contractAddress`
- `sparkRoundOffset`
- `lastSparkRoundNumber`
- `moduleRoundOffset`
- `lastmoduleRoundNumber`
Whenever a new IE round is started, we know the current IE round number (`meridianRoundIndex`)
Let me explain how are the different cases handled.
**Business as usual**
We want to map IE round number to SPARK round number. This assumes we have already initialised our
We want to map IE round number to module round number. This assumes we have already initialised our
DB for the current IE contract version we are working with.
```
sparkRoundNumber = meridianRoundIndex + sparkRoundOffset
moduleRoundNumber = meridianRoundIndex + moduleRoundOffset
```
For example, if we observe IE round 123, then `sparkRoundOffset` is `-122` and we calculate the
spark round as `123 + (-122) = 1`.
For example, if we observe IE round 123, then `moduleRoundOffset` is `-122` and we calculate the
module round as `123 + (-122) = 1`.
We update the record for the current IE contract address
to set `last_spark_round_number = sparkRoundNumber`.
to set `last_module_round_number = moduleRoundNumber`.
**Fresh start**
There is no record in our DB. We want to map the current IE round number to SPARK round 1. Also, we
want to setup `sparkRoundOffset` so that the algorithm above produces correct SPARK round numbers.
There is no record in our DB. We want to map the current IE round number to module round 1. Also, we
want to setup `moduleRoundOffset` so that the algorithm above produces correct module round numbers.
```
sparkRoundNumber = 1
sparkRoundOffset = sparkRoundNumber - meridianRoundIndex
moduleRoundNumber = 1
moduleRoundOffset = moduleRoundNumber - meridianRoundIndex
```
We insert a new record to our DB with the address of the current IE contract, `sparkRoundOffset`,
and `last_spark_round_number = sparkRoundNumber`.
We insert a new record to our DB with the address of the current IE contract, `moduleRoundOffset`,
and `last_module_round_number = moduleRoundNumber`.
**Upgrading IE contract**
We have one or more existing records in our DB. We know what is the last SPARK round that we
calculated from the previous version of the IE contract (`lastSparkRoundNumber`). We also know what
We have one or more existing records in our DB. We know what is the last module round that we
calculated from the previous version of the IE contract (`lastmoduleRoundNumber`). We also know what
is the round number of the new IE contract.
```
sparkRoundNumber = lastSparkRoundNumber + 1
sparkRoundOffset = sparkRoundNumber - meridianRoundIndex
moduleRoundNumber = lastmoduleRoundNumber + 1
moduleRoundOffset = moduleRoundNumber - meridianRoundIndex
```
We insert a new record to our DB with the address of the current IE contract, `sparkRoundOffset`,
and `last_spark_round_number = sparkRoundNumber`.
We insert a new record to our DB with the address of the current IE contract, `moduleRoundOffset`,
and `last_module_round_number = moduleRoundNumber`.
If you are wondering how to find out what is the last SPARK round that we calculated from the
If you are wondering how to find out what is the last module round that we calculated from the
previous version of the IE contract - we can easily find it in our DB:
```sql
SELECT last_spark_round_number
SELECT last_module_round_number
FROM meridian_contract_versions
ORDER BY last_spark_round_number DESC
ORDER BY last_module_round_number DESC
LIMIT 1
```
*/

export async function mapCurrentMeridianRoundToSparkRound ({
export async function mapCurrentMeridianRoundToModuleRound ({
moduleId,
meridianContractAddress,
meridianRoundIndex,
pgClient
}) {
let sparkRoundNumber
let moduleRoundNumber

const { rows: [contractVersionOfPreviousSparkRound] } = await pgClient.query(
'SELECT * FROM meridian_contract_versions ORDER BY last_spark_round_number DESC LIMIT 1'
)
const { rows: [contractVersionOfPreviousModuleRound] } = await pgClient.query(`
SELECT * FROM meridian_contract_versions
WHERE module_id = $1
ORDER BY last_module_round_number DESC
LIMIT 1
`, [moduleId])

// More events coming from the same meridian contract
if (contractVersionOfPreviousSparkRound?.contract_address === meridianContractAddress) {
sparkRoundNumber = BigInt(contractVersionOfPreviousSparkRound.spark_round_offset) + meridianRoundIndex
if (contractVersionOfPreviousModuleRound?.contract_address === meridianContractAddress) {
moduleRoundNumber = BigInt(contractVersionOfPreviousModuleRound.module_round_offset) + meridianRoundIndex
await pgClient.query(
'UPDATE meridian_contract_versions SET last_spark_round_number = $1 WHERE contract_address = $2',
[sparkRoundNumber, meridianContractAddress]
'UPDATE meridian_contract_versions SET last_module_round_number = $1 WHERE contract_address = $2',
[moduleRoundNumber, meridianContractAddress]
)
console.log('Mapped %s IE round index %s to SPARK round number %s',
console.log('Mapped %s IE round index %s to module round number %s',
meridianContractAddress,
meridianRoundIndex,
sparkRoundNumber
moduleRoundNumber
)
} else {
// We are running for the first time and need to map the meridian round to spark round 1
// We are running for the first time and need to map the meridian round to module round 1
// Or the contract address has changed
const lastSparkRoundNumber = BigInt(contractVersionOfPreviousSparkRound?.last_spark_round_number ?? 0)
sparkRoundNumber = lastSparkRoundNumber + 1n
const sparkRoundOffset = sparkRoundNumber - meridianRoundIndex
const lastmoduleRoundNumber = BigInt(contractVersionOfPreviousModuleRound?.last_module_round_number ?? 0)
moduleRoundNumber = lastmoduleRoundNumber + 1n
const moduleRoundOffset = moduleRoundNumber - meridianRoundIndex

// TODO(bajtos) If we are were are reverting back to a contract address (version) we were
// using sometime in the past, the query above will fail. We can fix the problem and support
// this edge case by telling Postgres to ignore conflicts (`ON CONFLICT DO NOTHING)`
await pgClient.query(`
INSERT INTO meridian_contract_versions
(contract_address, spark_round_offset, last_spark_round_number, first_spark_round_number)
VALUES ($1, $2, $3, $3)
(contract_address, module_round_offset, last_module_round_number, first_module_round_number, module_id)
VALUES ($1, $2, $3, $3, $4)
`, [
meridianContractAddress,
sparkRoundOffset,
sparkRoundNumber
moduleRoundOffset,
moduleRoundNumber,
moduleId
])
console.log(
'Upgraded meridian contract from %s to %s, mapping IE round index %s to SPARK round number %s',
contractVersionOfPreviousSparkRound?.contract_address ?? '<n/a>',
'Upgraded meridian contract from %s to %s, mapping IE round index %s to module round number %s',
contractVersionOfPreviousModuleRound?.contract_address ?? '<n/a>',
meridianContractAddress,
meridianRoundIndex,
sparkRoundNumber
moduleRoundNumber
)
}

await maybeCreateSparkRound(pgClient, { sparkRoundNumber, meridianContractAddress, meridianRoundIndex })
await maybeCreateModuleRound(pgClient, { moduleRoundNumber, meridianContractAddress, meridianRoundIndex, moduleId })

return sparkRoundNumber
return moduleRoundNumber
}

export async function maybeCreateSparkRound (pgClient, {
sparkRoundNumber,
export async function maybeCreateModuleRound (pgClient, {
moduleRoundNumber,
meridianContractAddress,
meridianRoundIndex
meridianRoundIndex,
moduleId
}) {
const { rowCount } = await pgClient.query(`
INSERT INTO spark_rounds
(id, created_at, meridian_address, meridian_round, max_tasks_per_node)
VALUES ($1, now(), $2, $3, $4)
INSERT INTO module_round
(id, created_at, meridian_address, meridian_round, max_tasks_per_node, module_id)
VALUES ($1, now(), $2, $3, $4, $5)
ON CONFLICT DO NOTHING
`, [
sparkRoundNumber,
moduleRoundNumber,
meridianContractAddress,
meridianRoundIndex,
MAX_TASKS_PER_NODE
MAX_TASKS_PER_NODE,
moduleId
])

if (rowCount) {
// We created a new SPARK round. Let's define retrieval tasks for this new round.
// We created a new module round. Let's define retrieval tasks for this new round.
// This is a short- to medium-term solution until we move to fully decentralized tasking
await defineTasksForRound(pgClient, sparkRoundNumber)
await defineTasksForRound(pgClient, moduleRoundNumber, moduleId)
}
}

async function defineTasksForRound (pgClient, sparkRoundNumber) {
async function defineTasksForRound (pgClient, moduleRoundNumber, moduleId) {
await pgClient.query(`
INSERT INTO retrieval_tasks (round_id, cid, provider_address, protocol)
SELECT $1 as round_id, cid, provider_address, protocol
INSERT INTO retrieval_tasks (round_id, cid, provider_address, protocol, module_id)
SELECT $1 as round_id, cid, provider_address, protocol, module_id
FROM retrieval_templates
WHERE module_id = $3
ORDER BY random()
LIMIT $2;
`, [
sparkRoundNumber,
TASKS_PER_ROUND
moduleRoundNumber,
TASKS_PER_ROUND,
moduleId
])
}
66 changes: 66 additions & 0 deletions lib/spark.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
import { validate } from './lib/validate.js'
import { satisfies } from 'compare-versions'
import assert from 'http-assert'

export const validateMeasurement = measurement => {
validate(measurement, 'sparkVersion', { type: 'string', required: false })
assert(
typeof measurement.sparkVersion === 'string' && satisfies(measurement.sparkVersion, '>=1.9.0'),
410, 'OUTDATED CLIENT'
)

validate(measurement, 'cid', { type: 'string', required: true })
validate(measurement, 'providerAddress', { type: 'string', required: true })
validate(measurement, 'protocol', { type: 'string', required: true })

validate(measurement, 'timeout', { type: 'boolean', required: false })
validate(measurement, 'startAt', { type: 'date', required: true })
validate(measurement, 'statusCode', { type: 'number', required: false })
validate(measurement, 'firstByteAt', { type: 'date', required: false })
validate(measurement, 'endAt', { type: 'date', required: false })
validate(measurement, 'byteLength', { type: 'number', required: false })
validate(measurement, 'attestation', { type: 'string', required: false })
validate(measurement, 'carTooLarge', { type: 'boolean', required: false })
validate(measurement, 'carChecksum', { type: 'string', required: false })
validate(measurement, 'indexerResult', { type: 'string', required: false })
}

export const sanitizeMeasurement = ({
measurement,
sparkRoundNumber,
inetGroup
}) => ({
sparkVersion: measurement.sparkVersion,
zinniaVersion: measurement.zinniaVersion,
cid: measurement.cid,
providerAddress: measurement.providerAddress,
protocol: measurement.protocol,
participantAddress: measurement.participantAddress,
timeout: measurement.timeout || false,
startAt: parseOptionalDate(measurement.startAt),
statusCode: measurement.statusCode,
firstByteAt: parseOptionalDate(measurement.firstByteAt),
endAt: parseOptionalDate(measurement.endAt),
byteLength: measurement.byteLength,
attestation: measurement.attestation,
inetGroup,
carTooLarge: measurement.carTooLarge ?? false,
carChecksum: measurement.carChecksum,
indexerResult: measurement.indexerResult,
sparkRoundNumber
})

/**
* Parse a date string field that may be `undefined` or `null`.
*
* - undefined -> undefined
* - null -> undefined
* - "iso-date-string" -> new Date("iso-date-string")
*
* @param {string | null | undefined} str
* @returns {Date | undefined}
*/
const parseOptionalDate = (str) => {
if (str === undefined || str === null) return undefined
return new Date(str)
}
9 changes: 9 additions & 0 deletions lib/voyager.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import assert from 'http-assert'

export const validateMeasurement = measurement => {
assert.fail('Not implemented')
}

export const sanitizeMeasurement = () => {
assert.fail('Not implemented')
}
29 changes: 29 additions & 0 deletions migrations/042.do.meridian-platform.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
TRUNCATE TABLE measurements;

ALTER TABLE measurements
ADD COLUMN data TYPE TEXT
DROP COLUMN participant_address,
DROP COLUMN finished_at,
DROP COLUMN start_at,
DROP COLUMN status_code,
DROP COLUMN first_byte_at,
DROP COLUMN end_at,
DROP COLUMN byte_length,
DROP COLUMN timeout,
DROP COLUMN attestation,
DROP COLUMN completed_at_round,
DROP COLUMN spark_version,
DROP COLUMN zinnia_version,
DROP COLUMN cid,
DROP COLUMN provider_address,
DROP COLUMN protocol,
DROP COLUMN inet_group,
DROP COLUMN car_too_large;

CREATE TABLE modules (
id SERIAL NOT NULL PRIMARY KEY,
name TEXT NOT NULL,
contract_address TEXT NOT NULL
);
INSERT INTO modules (name, slug, contract_address) VALUES ('SPARK', '0x8460766edc62b525fc1fa4d628fc79229dc73031');
INSERT INTO modules (name, slug, contract_address) VALUES ('Voyager', '0xc524b83bf85021e674a7c9f18f5381179fabaf6c');
1 change: 0 additions & 1 deletion spark-publish/ie-contract-config.js
Original file line number Diff line number Diff line change
@@ -2,7 +2,6 @@ import fs from 'node:fs/promises'
import { fileURLToPath } from 'node:url'

const {
IE_CONTRACT_ADDRESS = '0x8460766Edc62B525fc1FA4D628FC79229dC73031',
RPC_URLS = 'https://api.node.glif.io/rpc/v0,https://api.chain.love/rpc/v1',
GLIF_TOKEN
} = process.env
26 changes: 5 additions & 21 deletions spark-publish/index.js
Original file line number Diff line number Diff line change
@@ -11,26 +11,7 @@ export const publish = async ({
}) => {
// Fetch measurements
const { rows: measurements } = await pgPool.query(`
SELECT
id,
spark_version,
zinnia_version,
participant_address,
finished_at,
timeout,
start_at,
status_code,
first_byte_at,
end_at,
byte_length,
attestation,
inet_group,
car_too_large,
car_checksum,
indexer_result,
cid,
provider_address,
protocol
SELECT id, data
FROM measurements
LIMIT $1
`, [
@@ -49,7 +30,10 @@ export const publish = async ({
// Share measurements
let start = new Date()
const file = new File(
[measurements.map(m => JSON.stringify(m)).join('\n')],
[measurements.map(m => JSON.stringify({
...JSON.parse(m.data),
id: m.id,
})).join('\n')],
'measurements.ndjson',
{ type: 'application/json' }
)