Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,16 @@ export const MAX_VERIFICATION_RETRIES = +env.varOrDefault(
'5', // Maximum number of verification retry attempts
);

export const VERIFICATION_PARTITION_COUNT = +env.varOrDefault(
'VERIFICATION_PARTITION_COUNT',
'64', // Number of partitions to divide ID space
);

export const VERIFICATION_PARTITION_THRESHOLD = +env.varOrDefault(
'VERIFICATION_PARTITION_THRESHOLD',
'70', // Only partition filter IDs with priority below this threshold
);

// Filter determining which ANS-104 bundles to unbundle
export const ANS104_UNBUNDLE_FILTER_PARSED = JSON.parse(
env.varOrDefault('ANS104_UNBUNDLE_FILTER', '{"never": true}'),
Expand Down
45 changes: 45 additions & 0 deletions src/lib/verification-partition.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/**
* AR.IO Gateway
* Copyright (C) 2022-2025 Permanent Data Solutions, Inc. All Rights Reserved.
*
* SPDX-License-Identifier: AGPL-3.0-or-later
*/
import crypto from 'node:crypto';

export function getNodePartition(
partitionSeed: string,
totalPartitions: number,
): number {
const hash = crypto.createHash('sha256').update(partitionSeed).digest();
// Use first 4 bytes as 32-bit integer
const hashInt = hash.readUInt32BE(0);
return hashInt % totalPartitions;
}

export function getIdPartition(id: string, totalPartitions: number): number {
// Convert base64url ID to buffer
const idBuffer = Buffer.from(id, 'base64url');
// Use first 4 bytes as 32-bit integer
const idInt = idBuffer.readUInt32BE(0);
return idInt % totalPartitions;
}

export function shouldVerifyId(
id: string,
nodePartition: number,
totalPartitions: number,
priority?: number,
partitionThreshold: number = 70,
): boolean {
// High priority items bypass partition filtering
if (priority !== undefined && priority >= partitionThreshold) {
return true;
}

// Check if ID belongs to this node's partition
return getIdPartition(id, totalPartitions) === nodePartition;
}

export function generatePartitionSeed(wallet?: string): string {
return wallet ?? crypto.randomBytes(32).toString('hex');
}
21 changes: 20 additions & 1 deletion src/system.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ import { currentUnixTimestamp } from './lib/time.js';
import log from './log.js';
import * as metrics from './metrics.js';
import { StreamingManifestPathResolver } from './resolution/streaming-manifest-path-resolver.js';
import {
generatePartitionSeed,
getNodePartition,
} from './lib/verification-partition.js';
import { FsChunkDataStore } from './store/fs-chunk-data-store.js';
import { FsDataStore } from './store/fs-data-store.js';
import {
Expand Down Expand Up @@ -193,6 +197,21 @@ if (config.CHUNK_METADATA_SOURCE_TYPE === 'legacy-psql') {

// Workers
export const eventEmitter = new EventEmitter();

// Calculate node's verification partition
export const verificationPartitionSeed = generatePartitionSeed(
config.AR_IO_WALLET,
);
export const nodeVerificationPartition = getNodePartition(
verificationPartitionSeed,
config.VERIFICATION_PARTITION_COUNT,
);

log.info('Node verification partition assigned', {
partition: nodeVerificationPartition,
totalPartitions: config.VERIFICATION_PARTITION_COUNT,
seedSource: config.AR_IO_WALLET !== undefined ? 'wallet' : 'random',
});
eventEmitter.setMaxListeners(100);

export const blockImporter = new BlockImporter({
Expand Down Expand Up @@ -468,7 +487,7 @@ export const chunkDataFsCacheCleanupWorker =
config.ENABLE_CHUNK_DATA_CACHE_CLEANUP
? new FsCleanupWorker({
log,
basePath: 'data/chunks/by-dataroot',
basePath: 'data/chunks/data/by-dataroot',
dataType: 'chunk_data',
shouldDelete: async (path) => {
try {
Expand Down
58 changes: 53 additions & 5 deletions src/workers/data-verification.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import {
} from '../types.js';
import { DataRootComputer } from '../lib/data-root.js';
import * as config from '../config.js';
import { shouldVerifyId } from '../lib/verification-partition.js';
import { nodeVerificationPartition } from '../system.js';

export type QueueBundleResponse = {
status: 'skipped' | 'queued' | 'error';
Expand Down Expand Up @@ -113,22 +115,68 @@ export class DataVerificationWorker {
}

const dataIds = await this.contiguousDataIndex.getVerifiableDataIds();
const rootTxIds: string[] = [];
const rootTxIdsToVerify: string[] = [];
const skippedDataIds: string[] = [];

for (const dataId of dataIds) {
const rootTxId = await this.dataItemRootTxIndex.getRootTxId(dataId);
const rootIdToCheck = rootTxId ?? dataId;

if (rootTxId !== undefined && !rootTxIds.includes(rootTxId)) {
rootTxIds.push(rootTxId);
// Get data attributes to check priority
const dataAttributes =
await this.contiguousDataIndex.getDataAttributes(dataId);
const priority = dataAttributes?.verificationPriority;

// Apply partition filtering
const shouldVerify = shouldVerifyId(
rootIdToCheck,
nodeVerificationPartition,
config.VERIFICATION_PARTITION_COUNT,
priority,
config.VERIFICATION_PARTITION_THRESHOLD,
);

if (shouldVerify) {
if (!rootTxIdsToVerify.includes(rootIdToCheck)) {
rootTxIdsToVerify.push(rootIdToCheck);
}
} else {
// Track skipped IDs to increment retry count
skippedDataIds.push(dataId);
}
}

// Increment retry count for skipped IDs
for (const dataId of skippedDataIds) {
try {
await this.contiguousDataIndex.incrementVerificationRetryCount(dataId);
log.debug('Skipped verification due to partition filter', {
dataId,
nodePartition: nodeVerificationPartition,
});
} catch (error: any) {
log.error('Error incrementing retry count for skipped ID', {
dataId,
error: error.message,
});
}
}

// Queue only the IDs that passed partition filtering
const queuedItems = this.queue.getQueue();
for (const rootTxId of rootTxIds) {
if (rootTxId !== undefined && !queuedItems.includes(rootTxId)) {
for (const rootTxId of rootTxIdsToVerify) {
if (!queuedItems.includes(rootTxId)) {
log.debug('Queueing data ID for verification.', { id: rootTxId });
this.queue.push(rootTxId);
}
}

log.info('Data verification queue updated', {
totalDataIds: dataIds.length,
queued: rootTxIdsToVerify.length,
skipped: skippedDataIds.length,
nodePartition: nodeVerificationPartition,
});
}

async verifyDataRoot(id: string): Promise<boolean> {
Expand Down
Loading