Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/connection-manager/connections.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
const BACKTRACK_INTERVAL = 30 * 60 * 1000
const BASE_RETRY_DELAY = 1 * 1000
const MAX_RETRY_DELAY = 30 * 1000
const WS_HEARTBEAT_CHECK_INTERVAL = 10 * 60 * 1000
const WS_HEARTBEAT_TIMEOUT = 60 * 1000

// The frequent closing codes seen so far after connections established include:
// 1008: Policy error: client is too slow. (Most frequent)
Expand All @@ -46,6 +48,8 @@
const CLOSING_CODES = [1005, 1006, 1008]
let cmStarted = false

const webSocketHeartbeat: Map<string, Date> = new Map()

/**
* Sets the handlers for each WebSocket object.
*
Expand Down Expand Up @@ -79,9 +83,11 @@
})
subscribe(ws)

webSocketHeartbeat.set(ws.url, new Date())
resolve()
})
ws.on('message', function handleMessage(message: string) {
webSocketHeartbeat.set(ws.url, new Date())
let data
try {
data = JSON.parse(message)
Expand All @@ -101,6 +107,7 @@
)
})
ws.on('close', async (code) => {
webSocketHeartbeat.delete(ws.url)
void updateConnectionHealthStatus(ws.url, false)
ws.terminate()

Expand All @@ -120,13 +127,35 @@
resolve()
})
ws.on('error', () => {
webSocketHeartbeat.delete(ws.url)
void updateConnectionHealthStatus(ws.url, false)
ws.terminate()
resolve()
})
})
}

setInterval(() => {
log.info('WS INFO: Total number of live WS connections: ' + webSocketHeartbeat.size)

Check failure on line 139 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Unexpected string concatenation

Check failure on line 139 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Replace `'WS·INFO:·Total·number·of·live·WS·connections:·'·+·webSocketHeartbeat.size` with `⏎····'WS·INFO:·Total·number·of·live·WS·connections:·'·+·webSocketHeartbeat.size,⏎··`

Check failure on line 139 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Unexpected string concatenation

Check failure on line 139 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Replace `'WS·INFO:·Total·number·of·live·WS·connections:·'·+·webSocketHeartbeat.size` with `⏎····'WS·INFO:·Total·number·of·live·WS·connections:·'·+·webSocketHeartbeat.size,⏎··`
for (const [url, heartbeat] of webSocketHeartbeat.entries()) {
log.info('WS INFO: heartbeat for: ' + url + ' last received at: ' + (Date.now() - heartbeat.getTime()) + 'ms ago')

Check failure on line 141 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Unexpected string concatenation

Check failure on line 141 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Replace `'WS·INFO:·heartbeat·for:·'·+·url·+·'·last·received·at:·'·+·(Date.now()·-·heartbeat.getTime())·+·'ms·ago'` with `⏎······'WS·INFO:·heartbeat·for:·'·+⏎········url·+⏎········'·last·received·at:·'·+⏎········(Date.now()·-·heartbeat.getTime())·+⏎········'ms·ago',⏎····`

Check failure on line 141 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Unexpected string concatenation

Check failure on line 141 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Replace `'WS·INFO:·heartbeat·for:·'·+·url·+·'·last·received·at:·'·+·(Date.now()·-·heartbeat.getTime())·+·'ms·ago'` with `⏎······'WS·INFO:·heartbeat·for:·'·+⏎········url·+⏎········'·last·received·at:·'·+⏎········(Date.now()·-·heartbeat.getTime())·+⏎········'ms·ago',⏎····`
}

let staleConnections = 0
// report the stale connections whose heartbeat is older than 60 seconds (missing 15 validated ledgers)
for (const [url, heartbeat] of webSocketHeartbeat.entries()) {
if (Date.now() - heartbeat.getTime() > WS_HEARTBEAT_TIMEOUT) {
log.error('WS ERROR: stale connection for: ' + url + ' last receiveda message at: ' + heartbeat.toISOString() + '. (' + (Date.now() - heartbeat.getTime())/1000 + 'seconds ago). Terminating this WS connection.')

Check failure on line 148 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Unexpected string concatenation

Check failure on line 148 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Replace `'WS·ERROR:·stale·connection·for:·'·+·url·+·'·last·receiveda·message·at:·'·+·heartbeat.toISOString()·+·'.·('·+·(Date.now()·-·heartbeat.getTime())/1000·+·'seconds·ago).·Terminating·this·WS·connection.'` with `⏎········'WS·ERROR:·stale·connection·for:·'·+⏎··········url·+⏎··········'·last·receiveda·message·at:·'·+⏎··········heartbeat.toISOString()·+⏎··········'.·('·+⏎··········(Date.now()·-·heartbeat.getTime())·/·1000·+⏎··········'seconds·ago).·Terminating·this·WS·connection.',⏎······`

Check failure on line 148 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Unexpected string concatenation

Check failure on line 148 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Replace `'WS·ERROR:·stale·connection·for:·'·+·url·+·'·last·receiveda·message·at:·'·+·heartbeat.toISOString()·+·'.·('·+·(Date.now()·-·heartbeat.getTime())/1000·+·'seconds·ago).·Terminating·this·WS·connection.'` with `⏎········'WS·ERROR:·stale·connection·for:·'·+⏎··········url·+⏎··········'·last·receiveda·message·at:·'·+⏎··········heartbeat.toISOString()·+⏎··········'.·('·+⏎··········(Date.now()·-·heartbeat.getTime())·/·1000·+⏎··········'seconds·ago).·Terminating·this·WS·connection.',⏎······`
staleConnections++

Check failure on line 149 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Unary operator '++' used

Check failure on line 149 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Unary operator '++' used

// Note: It is possible that the Javascript runtime never garbage-collects these stale websockets. As part of future work, we should also record the `ws` instance and terminate stale web-sockets.

Check warning on line 151 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

This line has a comment length of 201. Maximum allowed is 130

Check warning on line 151 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

This line has a comment length of 201. Maximum allowed is 130
void updateConnectionHealthStatus(url, false)
webSocketHeartbeat.delete(url)
}
}
log.error('WS ERROR: Total number of stale connections: ' + staleConnections)

Check failure on line 156 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Unexpected string concatenation

Check failure on line 156 in src/connection-manager/connections.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Unexpected string concatenation
}, WS_HEARTBEAT_CHECK_INTERVAL)

/**
* Tries to find a valid WebSockets endpoint for a node.
*
Expand Down
149 changes: 149 additions & 0 deletions test/connections/websocket-heartbeat.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
// test/connections/heartbeat.test.ts
import type WebSocketType from 'ws'

// Helpers
const WS_HEARTBEAT_CHECK_INTERVAL = 10 * 60 * 1000 // matches src code

Check warning on line 5 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Unexpected comment inline with code

Check warning on line 5 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Unexpected comment inline with code
const WS_HEARTBEAT_TIMEOUT = 60 * 1000 // matches src code

Check warning on line 6 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Unexpected comment inline with code

Check warning on line 6 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Unexpected comment inline with code

const flushPromises = async (): Promise<void> =>
new Promise((resolve) => setTimeout(resolve, 0))

Check failure on line 9 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Return values from promise executor functions cannot be read

Check failure on line 9 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Return values from promise executor functions cannot be read

// Mock the WS module to a controllable EventEmitter-like class, and expose instances list
jest.mock('ws', () => {
const instances: any[] = []

Check failure on line 13 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Unexpected any. Specify a different type

Check failure on line 13 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Unexpected any. Specify a different type
class MockWebSocket {
public static instances = instances
public url: string
private handlers: Record<string, Array<(...args: any[]) => void>> = {}

Check warning on line 17 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Member 'handlers' is never reassigned; mark it as `readonly`

Check warning on line 17 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Member 'handlers' is never reassigned; mark it as `readonly`

public constructor(url: string) {
this.url = url
;(MockWebSocket as any).instances.push(this)
}

public on(event: string, cb: (...args: any[]) => void): void {
this.handlers[event] ??= []
this.handlers[event].push(cb)
}

public emit(event: string, ...args: any[]): void {
;(this.handlers[event] ?? []).forEach((cb) => cb(...args))
}

public send(): void {

Check warning on line 33 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Expected 'this' to be used by class method 'send'

Check warning on line 33 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Expected 'this' to be used by class method 'send'
// no-op
}

public terminate(): void {

Check warning on line 37 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Expected 'this' to be used by class method 'terminate'

Check warning on line 37 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Expected 'this' to be used by class method 'terminate'
// no-op
}
}

// default export
return MockWebSocket
})

// Mock DB + connection health dependencies used by connections.ts
jest.mock('../../src/shared/database/connectionHealth', () => {
return {
clearConnectionHealthDb: jest.fn(async () => {}),
getTotalConnectedNodes: jest.fn(async () => 0),
isNodeConnectedByIp: jest.fn(async () => false),
isNodeConnectedByPublicKey: jest.fn(async () => false),
isNodeConnectedByWsUrl: jest.fn(async () => false),
saveConnectionHealth: jest.fn(async () => {}),
updateConnectionHealthStatus: jest.fn(async () => {}),
}
})

jest.mock('../../src/shared/database/amendments', () => {
return {
fetchAmendmentInfo: jest.fn(async () => {}),
deleteAmendmentStatus: jest.fn(async () => {}),
}
})

jest.mock('../../src/shared/database', () => {
return {
// Unused in this test, but return a basic query fn to avoid accidental calls
query: jest.fn(),
// No extra networks to avoid creating extra sockets
getNetworks: jest.fn(async () => []),
// Return two nodes with ws_url so connections.ts uses those URLs directly
getNodes: jest.fn(async () => [
{ ip: 'unused', ws_url: 'ws://fresh.example', networks: 'testnet' },
{ ip: 'unused', ws_url: 'ws://stale.example', networks: 'testnet' },
]),
}
})

jest.mock('../../src/connection-manager/wsHandling', () => {
return {
subscribe: jest.fn(),
handleWsMessageSubscribeTypes: jest.fn(),
fetchAmendmentsFromLedgerEntry: jest.fn(async () => {}),
backtrackAmendmentStatus: jest.fn(async () => {}),
handleWsMessageLedgerEnableAmendments: jest.fn(async () => {}),
}
})

describe('websocket heartbeat sweep', () => {
beforeEach(() => {
jest.resetModules()
jest.clearAllMocks()
jest.useFakeTimers()
jest.setSystemTime(new Date('2024-01-01T00:00:00.000Z'))
})

afterEach(() => {
jest.useRealTimers()
})

test('marks stale connection and keeps fresh connection', async () => {
// Import after fake timers so setInterval is controlled by Jest timers
const { default: startConnections } = await import(

Check warning on line 104 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

dynamic imports require a leading comment with the webpack chunkname

Check warning on line 104 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

dynamic imports require a leading comment with the webpack chunkname
'../../src/connection-manager/connections'
)
const WebSocket = (await import('ws')).default as unknown as typeof WebSocketType

Check warning on line 107 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

dynamic imports require a leading comment with the webpack chunkname

Check warning on line 107 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

dynamic imports require a leading comment with the webpack chunkname
const {
updateConnectionHealthStatus,
} = await import('../../src/shared/database/connectionHealth')

Check warning on line 110 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

dynamic imports require a leading comment with the webpack chunkname

Check warning on line 110 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

dynamic imports require a leading comment with the webpack chunkname

// Start connection manager (this schedules intervals and creates 2 sockets via getNodes())
const startPromise = startConnections()

// Wait for sockets to be constructed
// (connections.ts waits for 'open' to resolve setHandlers)
await flushPromises()
const instances = (WebSocket as any).instances as Array<{
url: string
emit: (event: string, ...args: any[]) => void
}>
expect(instances.length).toBe(2)

// Simulate both sockets opening at t0 (sets heartbeat to t0)
for (const ws of instances) {
ws.emit('open')
}

await startPromise // initial startup completes after 'open' events

Check warning on line 129 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (22.x)

Unexpected comment inline with code

Check warning on line 129 in test/connections/websocket-heartbeat.test.ts

View workflow job for this annotation

GitHub Actions / build (20.x)

Unexpected comment inline with code

// Advance time beyond heartbeat timeout so all would be stale...
jest.setSystemTime(new Date(Date.now() + WS_HEARTBEAT_TIMEOUT + 1000))

// ...but refresh the "fresh" socket right before sweep
const fresh = instances.find((i) => i.url.includes('fresh'))!
fresh.emit('message', JSON.stringify({ type: 'noop' }))

// Trigger the heartbeat sweep interval once
jest.advanceTimersByTime(WS_HEARTBEAT_CHECK_INTERVAL)
await flushPromises()

// Only the stale socket should be marked disconnected
expect((updateConnectionHealthStatus as jest.Mock).mock.calls.length).toBe(1)
expect((updateConnectionHealthStatus as jest.Mock).mock.calls[0][0]).toBe(
'ws://stale.example',
)
expect((updateConnectionHealthStatus as jest.Mock).mock.calls[0][1]).toBe(false)
})
})
Loading