Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
e9f7d0e
initial commit to create DB for ledgers
ckeshava Aug 21, 2025
b0ab1e0
add unit tests to the validated_ledgers table setup and insertion ops
ckeshava Aug 21, 2025
e9cfe98
add API endpoint support for retrieving validated ledgers
ckeshava Aug 21, 2025
b453d0d
move the pruning logic into connection_manager file to prevent jest o…
ckeshava Aug 21, 2025
3206c8a
expose API endpoints for missing ledgers table
ckeshava Aug 21, 2025
51411e7
improved documentation
ckeshava Aug 21, 2025
ecf914b
fix linter errors and warnings
ckeshava Aug 22, 2025
7c5c96f
address PR comments
ckeshava Aug 22, 2025
98b148b
remove the unnecessary async keyword
ckeshava Aug 22, 2025
ec29db5
include validation_public_keys associated with every validated ledger
ckeshava Aug 29, 2025
01b381e
remove exposure of new API endpoints
ckeshava Aug 29, 2025
986698f
feat: store ledger_time in UTC format
ckeshava Aug 29, 2025
197929b
remove references to missing-ledgers table
ckeshava Aug 29, 2025
9bf4e1c
fix the chains failing test
ckeshava Aug 29, 2025
4078df4
fix faling test; record all validated-ledgers from non-mainnet UNL
ckeshava Aug 29, 2025
84740c0
address PR comments
ckeshava Sep 4, 2025
a122679
Merge branch 'main' into trackLedgers
ckeshava Sep 4, 2025
94abe5f
address additional PR comments from Raj
ckeshava Sep 4, 2025
a24ac13
Merge branch 'trackLedgers' of https://github.com/ckeshava/validator-…
ckeshava Sep 4, 2025
8208f4e
Merge branch 'main' into trackLedgers
ckeshava Sep 4, 2025
30e3c9d
fix: use Set while collecting validation public keys
ckeshava Sep 26, 2025
98cc0aa
Merge branch 'trackLedgers' of https://github.com/ckeshava/validator-…
ckeshava Sep 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions ARCHITECTURE.md
Original file line number Diff line number Diff line change
Expand Up @@ -198,5 +198,18 @@ This table keeps track of the WebSocket connection status for all networks.
| `connected` |Boolean denoting websocket connection status. |
| `status_update_time` |Time when the connected column was updated. |

### `validated_ledgers`
Copy link
Collaborator

@Patel-Raj11 Patel-Raj11 Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets update this after all comments are addressed.


This table keeps track of all validated ledgers received from streaming subscriptions across different networks.

| Key | Definition |
|-----------------|-------------------------------------------------------------------------|
| `network` | The network on which the ledger was validated. |
| `ledger_hash` | The hash of the validated ledger. |
| `ledger_index` | The index of the validated ledger. |
| `ledger_time` | The close time of the ledger (converted to datetime). |
| `txn_count` | The number of transactions in this ledger. |
| `received_at` | The time when this ledgerClosed message was received by the VHS. |

*Partial validations are not meant to vote for any particular ledger. A partial validation indicates that the validator is still online but not keeping up with consensus.
**A chain is a group of validators validating the same set of ledgers. `main`, `test`, and `dev` represent the validated versions of mainnet, testnet, and devnet respectively. Validators on a fork/validating an alternate version of the ledger will have a different value, usually of the form `chain.[num]`.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
"startCrawlerDev": "ts-node-dev --transpile-only ./src/crawler/index.ts | ./node_modules/.bin/bunyan ",
"startApiDev": "ts-node-dev --transpile-only ./src/api/index.ts | ./node_modules/.bin/bunyan",
"startConnectionsDev": "ts-node-dev --transpile-only ./src/connection-manager/index.ts | ./node_modules/.bin/bunyan",
"test": "jest",
"test": "jest --detectOpenHandles",
"prepublishOnly": "npm run build"
},
"files": [
Expand Down
29 changes: 2 additions & 27 deletions src/connection-manager/agreement.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@ import {
ValidationRaw,
ValidatorKeys,
Ballot,
Chain,
} from '../shared/types'
import { getLists, overlaps } from '../shared/utils'
import logger from '../shared/utils/logger'

import chains from './chains'
import getNetworkNameFromChainId from './utils'

const log = logger({ name: 'agreement' })

Expand Down Expand Up @@ -122,30 +121,6 @@ function isPreceedingFlagLedger(ledger_index: string): boolean {
return parseInt(ledger_index, 10) % 256 === 255
}

/**
* Finds network name from chain id.
*
* @param chain - A chain object.
* @returns String.
*/
async function getNetworkNameFromChainId(chain: Chain): Promise<string> {
let id = chain.id
const lists = await getLists().catch((err) => {
log.error('Error getting validator lists', err)
return undefined
})

if (lists != null) {
Object.entries(lists).forEach(([network, set]) => {
if (overlaps(chain.validators, set)) {
id = network
}
})
}

return id
}

/**
*
*/
Expand Down Expand Up @@ -174,7 +149,7 @@ class Agreement {
log.info('Calculating agreement scores')
const promises = []

const agreementChains = chains.calculateChainsFromLedgers()
const agreementChains = await chains.calculateChainsFromLedgers()

for (const chain of agreementChains) {
const ledger_hashes = chain.ledgers
Expand Down
33 changes: 24 additions & 9 deletions src/connection-manager/chains.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { Knex } from 'knex'

import { query } from '../shared/database'
import { insertValidations } from '../shared/database/validatedLedgers'
import { Ledger, ValidationRaw, Chain } from '../shared/types'
import { getLists, overlaps } from '../shared/utils'
import logger from '../shared/utils/logger'

import getNetworkNameFromChainId from './utils'

const log = logger({ name: 'chains' })
/**
* Helper to sort chains by chain length.
Expand All @@ -18,12 +21,18 @@ function sortChainLength(chain1: Chain, chain2: Chain): number {
}

/**
* Adds ledger information to chain.
* Adds ledger information to chain. Furthermore, it inserts validation_public_keys into the validated_ledgers table.
*
* @param ledger - Ledger to add to chain.
* @param chain - Chain to update.
*/
function addLedgerToChain(ledger: Ledger, chain: Chain): void {
async function addLedgerToChain(ledger: Ledger, chain: Chain): Promise<void> {
await insertValidations(
ledger.ledger_hash,
ledger.ledger_index,
Array.from(ledger.validations),
await getNetworkNameFromChainId(chain),
)
chain.ledgers.add(ledger.ledger_hash)
for (const validator of ledger.validations) {
chain.validators.add(validator)
Expand Down Expand Up @@ -102,7 +111,7 @@ class Chains {
*
* @returns List of chains being monitored by the system.
*/
public calculateChainsFromLedgers(): Chain[] {
public async calculateChainsFromLedgers(): Promise<Chain[]> {
const list = []
const now = Date.now()

Expand All @@ -121,7 +130,7 @@ class Chains {
list.sort((ledger1, ledger2) => ledger1.ledger_index - ledger2.ledger_index)

for (const ledger of list) {
this.updateChains(ledger)
await this.updateChains(ledger)
}

return this.chains
Expand Down Expand Up @@ -166,7 +175,7 @@ class Chains {
*
* @param ledger - Ledger being validated on a new chain.
*/
private addNewChain(ledger: Ledger): void {
private async addNewChain(ledger: Ledger): Promise<void> {
const current = ledger.ledger_index
const validators = ledger.validations
const ledgerSet = new Set([ledger.ledger_hash])
Expand All @@ -183,14 +192,20 @@ class Chains {

log.info(`Added new chain, chain.${chain.id}`)
this.chains.push(chain)
await insertValidations(
ledger.ledger_hash,
ledger.ledger_index,
Array.from(ledger.validations),
await getNetworkNameFromChainId(chain),
)
}

/**
* Updates Chains as ledgers are parsed.
*
* @param ledger - The Ledger being handled in order to update the chains.
*/
private updateChains(ledger: Ledger): void {
private async updateChains(ledger: Ledger): Promise<void> {
const next = ledger.ledger_index
const validators = ledger.validations

Expand All @@ -203,7 +218,7 @@ class Chains {
.shift()

if (chainAtNextIndex !== undefined) {
addLedgerToChain(ledger, chainAtNextIndex)
await addLedgerToChain(ledger, chainAtNextIndex)
return
}

Expand Down Expand Up @@ -232,15 +247,15 @@ class Chains {
log.warn(`Possibly skipped ${skipped} ledgers`)
if (skipped > 1 && skipped < 20) {
chainWithThisValidator.incomplete = true
addLedgerToChain(ledger, chainWithThisValidator)
await addLedgerToChain(ledger, chainWithThisValidator)
}
}

if (chainWithThisValidator !== undefined || chainWithLedger !== undefined) {
return
}

this.addNewChain(ledger)
await this.addNewChain(ledger)
}
}

Expand Down
21 changes: 21 additions & 0 deletions src/connection-manager/connections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -233,5 +233,26 @@ export default async function startConnections(): Promise<void> {
setInterval(() => {
void backtrackAmendmentStatus()
}, BACKTRACK_INTERVAL)

const ONE_DAY_IN_MILLISECONDS = 1000 * 60 * 60 * 24
setInterval(pruneValidatedLedgersTable, ONE_DAY_IN_MILLISECONDS)
}
}

// delete all validated ledgers older than 30 days.
async function pruneValidatedLedgersTable(): Promise<void> {
const cutoffDate = new Date()
cutoffDate.setUTCHours(0, 0, 0, 0)
cutoffDate.setUTCDate(cutoffDate.getUTCDate() - 30)

try {
const deletedRows: number = await query('validated_ledgers')
.where('ledger_time', '<', cutoffDate)
.del()
log.info(
`Pruned ${deletedRows} rows from validated ledgers table. Deleted ledgers were older than ${cutoffDate.toISOString()}`,
)
} catch (err: unknown) {
log.error(`Error pruning validated ledgers table`, err)
}
}
31 changes: 31 additions & 0 deletions src/connection-manager/utils.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { Chain } from '../shared/types'
import { getLists, overlaps } from '../shared/utils'
import logger from '../shared/utils/logger'

const log = logger({ name: 'connection-manager-utils' })

/**
* Finds network name from chain id.
*
* @param chain - A chain object.
* @returns String.
*/
export default async function getNetworkNameFromChainId(
chain: Chain,
): Promise<string> {
let id = chain.id
const lists = await getLists().catch((err) => {
log.error('Error getting validator lists', err)
return undefined
})

if (lists != null) {
Object.entries(lists).forEach(([network, set]) => {
if (overlaps(chain.validators, set)) {
id = network
}
})
}

return id
}
4 changes: 4 additions & 0 deletions src/connection-manager/wsHandling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import {
NETWORKS_HOSTS,
deleteAmendmentStatus,
} from '../shared/database/amendments'
import { insertValidatedLedger } from '../shared/database/validatedLedgers'
import {
AmendmentStatus,
FeeVote,
Expand Down Expand Up @@ -110,7 +111,10 @@ export async function handleWsMessageSubscribeTypes(
} else if (data.type.includes('ledger')) {
const current_ledger = data as StreamLedger
ledger_hashes.push(current_ledger.ledger_hash)

if (networks) {
await insertValidatedLedger(networks, current_ledger)
Copy link
Collaborator

@Patel-Raj11 Patel-Raj11 Sep 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we are not de-duplicating when hitting database. Not sure if this will lead to deadlock.
try catch in insertValidatedLedger would prevent node.js terminating, but not the deadlock if it ever occurs.

If we want to keep this as it is, I would suggest deploying this branch on staging/dev and then deploying it in production if it works well.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

On line 33, I'm checking against for duplicates against the primary-key of the db. Given that these 3 columns make the primary-key, I expect this operation to be optimized by the DB.

Besides, I'm not executing any other instruction in that method. Hence, there is no possibility of ungraceful-closure of the db operation (or) leakage of DB-connections in this method.

Sure, we can observe in dev/stg environment for a few days.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not blocking the PR review: I would recommend testing it in staging/dev for a few days.


const fee: FeeVote = {
fee_base: current_ledger.fee_base,
reserve_base: current_ledger.reserve_base,
Expand Down
21 changes: 21 additions & 0 deletions src/shared/database/setup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export default async function setupTables(): Promise<void> {
await setupBallotTable()
await addAmendmentsDataFromJSON()
await setupConnectionHealthTable()
await setupValidatedLedgersTable()
}

async function setupCrawlsTable(): Promise<void> {
Expand Down Expand Up @@ -278,3 +279,23 @@ async function setupConnectionHealthTable(): Promise<void> {
})
}
}

export async function setupValidatedLedgersTable(): Promise<void> {
const hasTable = await db().schema.hasTable('validated_ledgers')
if (!hasTable) {
await db().schema.createTable('validated_ledgers', (table) => {
table.string('network').notNullable()
table.string('ledger_hash').notNullable()
table.bigInteger('ledger_index').notNullable()
// Note: The VHS transforms the value from Ripple-Epoch-Time to UTC format
table.dateTime('ledger_time').notNullable()
table.bigInteger('fee_base').notNullable()
table.bigInteger('reserve_base').notNullable()
table.bigInteger('reserve_inc').notNullable()
table.integer('txn_count')
table.specificType('validation_public_keys', 'TEXT[]')
table.dateTime('received_at').defaultTo(db().fn.now())
table.primary(['ledger_index', 'network', 'ledger_hash'])
})
}
}
Loading
Loading