Skip to content
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 34 additions & 0 deletions src/connection-manager/agreement.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@ import {
ValidationRaw,
ValidatorKeys,
Ballot,
Chain,
} from '../shared/types'
import { getLists, overlaps } from '../shared/utils'
import logger from '../shared/utils/logger'

import chains from './chains'
Expand Down Expand Up @@ -120,6 +122,30 @@ function isPreceedingFlagLedger(ledger_index: string): boolean {
return parseInt(ledger_index, 10) % 256 === 255
}

/**
* Finds network name from chain id.
*
* @param chain - A chain object.
* @returns String.
*/
async function getNetworkNameFromChainId(chain: Chain): Promise<string> {
let id = chain.id
const lists = await getLists().catch((err) => {
log.error('Error getting validator lists', err)
return undefined
})

if (lists != null) {
Object.entries(lists).forEach(([network, set]) => {
if (overlaps(chain.validators, set)) {
id = network
}
})
}

return id
}

/**
*
*/
Expand Down Expand Up @@ -153,6 +179,14 @@ class Agreement {
for (const chain of agreementChains) {
const ledger_hashes = chain.ledgers

const networkName = await getNetworkNameFromChainId(chain)

log.info(
`Agreement: ${chain.id}:${networkName}:${Array.from(
chain.validators,
).join(',')}`,
)

for (const signing_key of chain.validators) {
promises.push(
this.calculateValidatorAgreement(
Expand Down
56 changes: 34 additions & 22 deletions src/connection-manager/connections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const ports = [443, 80, 6005, 6006, 51233, 51234]
const protocols = ['wss://', 'ws://']
const connections: Map<string, WebSocket> = new Map()
const networkFee: Map<string, FeeVote> = new Map()
const validationNetworkDb: Map<string, string> = new Map()
const CM_INTERVAL = 60 * 60 * 1000
const WS_TIMEOUT = 10000
const REPORTING_INTERVAL = 15 * 60 * 1000
Expand Down Expand Up @@ -60,11 +61,11 @@ async function setHandlers(
isInitialNode = false,
retryCount = 0,
): Promise<void> {
log.info(
`Initiated Websocket connection for: ${ws_url} on ${
networks ?? 'unknown network'
}`,
)
// log.info(
// `Initiated Websocket connection for: ${ws_url} on ${
// networks ?? 'unknown network'
// }`,
// )

const ledger_hashes: string[] = []
return new Promise(function setHandlersPromise(resolve, _reject) {
Expand Down Expand Up @@ -119,24 +120,25 @@ async function setHandlers(
networks,
networkFee,
ws,
validationNetworkDb,
)
}
})
ws.on('close', async (code, reason) => {
log.error(
`Websocket closed for ${ws.url} on ${
networks ?? 'unknown network'
} with code ${code} and reason ${reason.toString('utf-8')}.`,
)
ws.on('close', async (code) => {
// log.error(
// `Websocket closed for ${ws.url} on ${
// networks ?? 'unknown network'
// } with code ${code} and reason ${reason.toString('utf-8')}.`,
// )

const delay = BASE_RETRY_DELAY * 2 ** retryCount

if (CLOSING_CODES.includes(code) && delay <= MAX_RETRY_DELAY) {
log.info(
`Reconnecting to ${ws.url} on ${
networks ?? 'unknown network'
} after ${delay}ms...`,
)
// log.info(
// `Reconnecting to ${ws.url} on ${
// networks ?? 'unknown network'
// } after ${delay}ms...`,
// )
// Clean up the old Websocket connection
connections.delete(ws.url)
ws.terminate()
Expand Down Expand Up @@ -166,12 +168,12 @@ async function setHandlers(
ws.terminate()
resolve()
})
ws.on('error', (err) => {
log.error(
`Websocket connection error for ${ws.url} on ${
networks ?? 'unknown network'
} - ${err.message}`,
)
ws.on('error', () => {
// log.error(
// `Websocket connection error for ${ws.url} on ${
// networks ?? 'unknown network'
// } - ${err.message}`,
// )

if (connections.get(ws.url)?.url === ws.url) {
connections.delete(ws.url)
Expand Down Expand Up @@ -230,13 +232,23 @@ async function findConnection(node: WsNode): Promise<void> {
return Promise.resolve()
}

async function getValidationNetworkDb(): Promise<void> {
const validatorNetwork: Array<{ signing_key: string; networks: string }> =
await query('validators').select('signing_key', 'networks')
for (const entry of validatorNetwork) {
validationNetworkDb.set(entry.signing_key, entry.networks)
}
}

/**
* Creates connections to nodes found in the database.
*
* @returns A promise that resolves to void once all possible connections have been created.
*/
async function createConnections(): Promise<void> {
log.info('Finding Connections...')
validationNetworkDb.clear()
await getValidationNetworkDb()
Comment on lines +244 to +245
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The createConnections method is executed every CM_INTERVAL (1 hour). I'm concerned about the staleness of the data in the validationNetworkDb in-memory map.

New entries into the validator table are inserted in the saveValidators method. The saveValidator method is invoked upon receiving every new validation.

This is a hypothetical scenario: When we start the VHS connection service, we are not aware of many validators. However, we populate the validationNetworkDb in-memory map and it will not be updated for the next hour.

In the meantime, we can receive validations from new validators which are not present in our validationNetworkDb map. In the current code, we will mis-place these validations into the wrong network.

We can remedy this situation by falling-back to do a fresh db access, if we don't have any entry in validationNetworkDb in-memory map. Alternatively, we can run the createConnections method much more frequently.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a fallback to database if we don't find in cache.

Copy link
Collaborator Author

@pdp2121 pdp2121 Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validationNetworkDb map is only used to fetch the network default fee voting data in case the current validation ledger_hash is not seen in the previous 10 hashes. It is not being used as source of truth for networks

const tenMinutesAgo = new Date()
tenMinutesAgo.setMinutes(tenMinutesAgo.getMinutes() - 10)

Expand Down
19 changes: 5 additions & 14 deletions src/connection-manager/wsHandling.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,13 @@ import {
import { AMENDMENTS_ID } from 'xrpl/dist/npm/models/ledger'
import { LedgerResponseExpanded } from 'xrpl/dist/npm/models/methods/ledger'

import {
query,
saveAmendmentStatus,
saveAmendmentsStatus,
} from '../shared/database'
import { saveAmendmentStatus, saveAmendmentsStatus } from '../shared/database'
import {
NETWORKS_HOSTS,
deleteAmendmentStatus,
} from '../shared/database/amendments'
import {
AmendmentStatus,
DatabaseValidator,
FeeVote,
StreamLedger,
StreamManifest,
Expand Down Expand Up @@ -106,6 +101,7 @@ function isFlagLedgerPlusOne(ledger_index: number): boolean {
* @param networks - The networks of subscribed node.
* @param network_fee - The map of default fee for the network to be used in case the validator does not vote for a new fee.
* @param ws - The WebSocket message received from.
* @param validationNetworkDb -- The validation network map to fetch fee data.
* @returns Void.
*/
// eslint-disable-next-line max-params -- Disabled for this function.
Expand All @@ -115,22 +111,17 @@ export async function handleWsMessageSubscribeTypes(
networks: string | undefined,
network_fee: Map<string, FeeVote>,
ws: WebSocket,
validationNetworkDb: Map<string, string>,
): Promise<void> {
if (data.type === 'validationReceived') {
const validationData = data as ValidationRaw
if (ledger_hashes.includes(validationData.ledger_hash)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This ledger_hashes acts as a "cache" when VHS receives validations relevant to a network. We only need to perform the database query when we receive the "validation" before the "ledger". (Check the handling of ledgerStream below)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ledger_hashes is maintained in-memory for every ws-connection. See the implementation of setHandlers() method.

Copy link
Contributor

@Patel-Raj Patel-Raj Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we rely on the networks field passed to export async function handleWsMessageSubscribeTypes( to compute the fees? It would completely eliminate need to store the validationNetworkDb in-memory map and database access. In which case that networks fields would be inconsistent?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are picking up the ws_urls to open up websocket connection from crawls table. And this piece of code makes sure that at least some network is set. So, I wanted to know if we can remove the networks check in the first place and assume that it would always be defined.


const promises = []
  const networks = await getNetworks()
  for (const network of networks) {
    const crawler = new Crawler()
    promises.push(crawler.crawl(network))
  }

Also, we can pick only those ws_urls where networks is not null from the existing database:

const nodes = await query('crawls')
    .select(['ip', 'ws_url', 'networks'])
    .whereNotNull('ip')
    .andWhere('start', '>', tenMinutesAgo)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Patel-Raj

Can we rely on the networks field passed to export async function handleWsMessageSubscribeTypes( to compute the fees? It would completely eliminate need to store the validationNetworkDb in-memory map and database access. In which case that networks fields would be inconsistent?

Yes, this is the current fall-back mechanism of the code. This is the else condition if the database does not have the network entry.

We are picking up the ws_urls to open up websocket connection from crawls table. And this piece of code makes sure that at least some network is set. So, I wanted to know if we can remove the networks check in the first place and assume that it would always be defined.

Are you suggesting that we use the crawls table to fetch this network information? We can use it, however it only changes the database read access from validators to the crawls table. It doesn't reduce the load on the db per se.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to fallback? Can't we just use the networks passed in the handleWsMessageSubscribeTypes method as source of truth?

It is possible for one rippled validator to be configured for multiple networks, for instance main and dev. Look at this comment for more info.

In such cases, I suspect one websocket connection could serve ledgers, validations pertaining to multiple networks. That is the only reason why we cannot use the networks input parameter to this function.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But, #329 will not go to fetch the network from database, it could remain undefined.

The networks value is not undefined. We are using the validation to read the networks value with a fall-back on the function input parameter.

I have identified one scenario where this code does not work correctly, however I don't know of many such cases. I believe its a worthy optimization to get the VHS back on track.

Copy link
Contributor

@Patel-Raj Patel-Raj Apr 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation gets its network from the function input parameter.


if (ledger_hashes.includes(validationData.ledger_hash)) {
      validationData.networks = networks
    }

It does not implicitly have network field. It would either remain undefined or get it from the function parameter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the original interface:

interface ValidationRaw {
  flags: number
  full: boolean
  ledger_hash: string
  ledger_index: string
  master_key: string
  signature: string
  signing_time: number
  type: string
  validation_public_key: string
  server_version?: string

  networks?: string

  amendments?: string[]
  base_fee?: number
  reserve_base?: number
  reserve_inc?: number
  ledger_fee?: FeeVote

  // The validation_public_key is the same as the signing_key in StreamManifest
}

validationData does implicitly contain an optional network field. If this ledger_hash hasn't been encountered in recent hashes (as evidenced by the if-condition), then we are not using the networks function parameter.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Patel-Raj we cannot use the networks field passed in to populate the networks for validations since sometimes a node from one network can receive validation message on different networks. That was why we had to use past 10 ledger hashes as source of truth

validationData.networks = networks
}

// Get network of the validation if ledger_hash is not in cache.
const validationNetworkDb: DatabaseValidator | undefined = await query(
'validators',
)
.select('*')
.where('signing_key', validationData.validation_public_key)
.first()
const validationNetwork =
validationNetworkDb?.networks ?? validationData.networks
validationNetworkDb.get(validationData.validation_public_key) ??
validationData.networks

// Get the fee for the network to be used in case the validator does not vote for a new fee.
if (validationNetwork) {
Expand Down