Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
2ebe07b
WIP Initial exponential backoff reconnect implementation.
chr15m Sep 27, 2025
65f9642
Simplify onclose reconnect.
chr15m Sep 28, 2025
6e028f2
Allow Relay() to be instantiated with enablePing.
chr15m Sep 28, 2025
564af1b
Merge branch 'master' into reconnect-with-exponential-backoff
chr15m Sep 28, 2025
b178f3d
Allow enbleReconnect to be passed through.
chr15m Sep 28, 2025
10904ab
Tweak enablePing and enableReconnect in doc.
chr15m Sep 28, 2025
2545932
Make resubscribeBackoff public/configurable.
chr15m Sep 28, 2025
ca18534
Simulate unresponsive relay in MockRelay.
chr15m Sep 28, 2025
48ccd6c
Add tests for enablePing.
chr15m Sep 28, 2025
0b1d370
Linter fixes.
chr15m Sep 28, 2025
7937087
Merge branch 'ping-improvements' into reconnect-with-exponential-backoff
chr15m Sep 28, 2025
e7e7129
Remove redundant forced close since ws.close triggers it.
chr15m Sep 28, 2025
234c1c2
Remove redundant closeAllSubscriptions - ws close triggers it.
chr15m Sep 28, 2025
0da7c75
Tests for enableReconnect exponential backoff.
chr15m Sep 28, 2025
ce1c606
Suppress uncaught onevent in reconnect test.
chr15m Sep 28, 2025
7e1e024
Use safer ternary in pinpong.
chr15m Sep 28, 2025
d74d8b5
Pingpong tests simluating node and browser.
chr15m Sep 28, 2025
61dc0af
Fix reconnect loops and pool reconnect tracking.
chr15m Sep 28, 2025
5f71a52
Properly track pingTimeout across disconnects.
chr15m Sep 28, 2025
b5a357a
Prevent spurious closed socket warning.
chr15m Sep 28, 2025
7af5051
Tweak docs.
chr15m Sep 28, 2025
fee2600
Linter fix.
chr15m Sep 28, 2025
a31b1a3
Merge branch 'master' into reconnect-with-exponential-backoff
chr15m Sep 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -133,14 +133,44 @@ import WebSocket from 'ws'
useWebSocketImplementation(WebSocket)
```

You can enable regular pings of connected relays with the `enablePing` option. This will set up a heartbeat that closes the websocket if it doesn't receive a response in time. Some platforms don't report websocket disconnections due to network issues, and enabling this can increase reliability.
#### enablePing

You can enable regular pings of connected relays with the `enablePing` option. This will set up a heartbeat that closes the websocket if it doesn't receive a response in time. Some platforms, like Node.js, don't report websocket disconnections due to network issues, and enabling this can increase the reliability of the `onclose` event.

```js
import { SimplePool } from 'nostr-tools/pool'

const pool = new SimplePool({ enablePing: true })
```

#### enableReconnect

You can also enable automatic reconnection with the `enableReconnect` option. This will make the pool try to reconnect to relays with an exponential backoff delay if the connection is lost unexpectedly.

```js
import { SimplePool } from 'nostr-tools/pool'

const pool = new SimplePool({ enableReconnect: true })
```

Using both `enablePing: true` and `enableReconnect: true` is recommended as it will improve the reliability and timeliness of the reconnection (at the expense of slighly higher bandwidth due to the ping messages).

```js
// on Node.js
const pool = new SimplePool({ enablePing: true, enableReconnect: true })
```

The `enableReconnect` option can also be a callback function which will receive the current subscription filters and should return a new set of filters. This is useful if you want to modify the subscription on reconnect, for example, to update the `since` parameter to fetch only new events.

```js
const pool = new SimplePool({
enableReconnect: (filters) => {
const newSince = Math.floor(Date.now() / 1000)
return filters.map(filter => ({ ...filter, since: newSince }))
}
})
```

### Parsing references (mentions) from a content based on NIP-27

```js
Expand Down
7 changes: 6 additions & 1 deletion abstract-pool.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ export class AbstractSimplePool {

public verifyEvent: Nostr['verifyEvent']
public enablePing: boolean | undefined
public enableReconnect: boolean | ((filters: Filter[]) => Filter[]) | undefined
public trustedRelayURLs: Set<string> = new Set()

private _WebSocket?: typeof WebSocket
Expand All @@ -41,6 +42,7 @@ export class AbstractSimplePool {
this.verifyEvent = opts.verifyEvent
this._WebSocket = opts.websocketImplementation
this.enablePing = opts.enablePing
this.enableReconnect = opts.enableReconnect
}

async ensureRelay(url: string, params?: { connectionTimeout?: number }): Promise<AbstractRelay> {
Expand All @@ -52,9 +54,12 @@ export class AbstractSimplePool {
verifyEvent: this.trustedRelayURLs.has(url) ? alwaysTrue : this.verifyEvent,
websocketImplementation: this._WebSocket,
enablePing: this.enablePing,
enableReconnect: this.enableReconnect,
})
relay.onclose = () => {
this.relays.delete(url)
if (relay && !relay.enableReconnect) {
this.relays.delete(url)
}
}
if (params?.connectionTimeout) relay.connectionTimeout = params.connectionTimeout
this.relays.set(url, relay)
Expand Down
91 changes: 76 additions & 15 deletions abstract-relay.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ export type AbstractRelayConstructorOptions = {
verifyEvent: Nostr['verifyEvent']
websocketImplementation?: typeof WebSocket
enablePing?: boolean
enableReconnect?: boolean | ((filters: Filter[]) => Filter[])
}

export class SendingOnClosedConnection extends Error {
Expand All @@ -37,9 +38,15 @@ export class AbstractRelay {
public publishTimeout: number = 4400
public pingFrequency: number = 20000
public pingTimeout: number = 20000
public resubscribeBackoff: number[] = [10000, 10000, 10000, 20000, 20000, 30000, 60000]
public openSubs: Map<string, Subscription> = new Map()
public enablePing: boolean | undefined
public enableReconnect: boolean | ((filters: Filter[]) => Filter[])
private connectionTimeoutHandle: ReturnType<typeof setTimeout> | undefined
private reconnectTimeoutHandle: ReturnType<typeof setTimeout> | undefined
private pingTimeoutHandle: ReturnType<typeof setTimeout> | undefined
private reconnectAttempts: number = 0
private closedIntentionally: boolean = false

private connectionPromise: Promise<void> | undefined
private openCountRequests = new Map<string, CountResolver>()
Expand All @@ -59,6 +66,7 @@ export class AbstractRelay {
this.verifyEvent = opts.verifyEvent
this._WebSocket = opts.websocketImplementation || WebSocket
this.enablePing = opts.enablePing
this.enableReconnect = opts.enableReconnect || false
}

static async connect(url: string, opts: AbstractRelayConstructorOptions): Promise<AbstractRelay> {
Expand Down Expand Up @@ -88,6 +96,40 @@ export class AbstractRelay {
return this._connected
}

private async reconnect(): Promise<void> {
const backoff = this.resubscribeBackoff[Math.min(this.reconnectAttempts, this.resubscribeBackoff.length - 1)]
this.reconnectAttempts++

this.reconnectTimeoutHandle = setTimeout(async () => {
try {
await this.connect()
} catch (err) {
// this will be called again through onclose/onerror
}
}, backoff)
}

private handleHardClose(reason: string) {
if (this.pingTimeoutHandle) {
clearTimeout(this.pingTimeoutHandle)
this.pingTimeoutHandle = undefined
}

this._connected = false
this.connectionPromise = undefined

const wasIntentional = this.closedIntentionally
this.closedIntentionally = false // reset for next time

this.onclose?.()

if (this.enableReconnect && !wasIntentional) {
this.reconnect()
} else {
this.closeAllSubscriptions(reason)
}
}

public async connect(): Promise<void> {
if (this.connectionPromise) return this.connectionPromise

Expand All @@ -110,8 +152,23 @@ export class AbstractRelay {
}

this.ws.onopen = () => {
if (this.reconnectTimeoutHandle) {
clearTimeout(this.reconnectTimeoutHandle)
this.reconnectTimeoutHandle = undefined
}
clearTimeout(this.connectionTimeoutHandle)
this._connected = true
this.reconnectAttempts = 0

// resubscribe to all open subscriptions
for (const sub of this.openSubs.values()) {
sub.eosed = false
if (typeof this.enableReconnect === 'function') {
sub.filters = this.enableReconnect(sub.filters)
}
sub.fire()
}

if (this.enablePing) {
this.pingpong()
}
Expand All @@ -121,19 +178,13 @@ export class AbstractRelay {
this.ws.onerror = ev => {
clearTimeout(this.connectionTimeoutHandle)
reject((ev as any).message || 'websocket error')
this._connected = false
this.connectionPromise = undefined
this.onclose?.()
this.closeAllSubscriptions('relay connection errored')
this.handleHardClose('relay connection errored')
}

this.ws.onclose = ev => {
clearTimeout(this.connectionTimeoutHandle)
reject((ev as any).message || 'websocket closed')
this._connected = false
this.connectionPromise = undefined
this.onclose?.()
this.closeAllSubscriptions('relay connection closed')
this.handleHardClose('relay connection closed')
}

this.ws.onmessage = this._onmessage.bind(this)
Expand All @@ -145,7 +196,7 @@ export class AbstractRelay {
private async waitForPingPong() {
return new Promise((res, err) => {
// listen for pong
;(this.ws && this.ws.on && this.ws.on('pong', () => res(true))) || err("ws can't listen for pong")
this.ws && this.ws.on ? this.ws.on('pong', () => res(true)) : err("ws can't listen for pong")
// send a ping
this.ws && this.ws.ping && this.ws.ping()
})
Expand Down Expand Up @@ -178,13 +229,12 @@ export class AbstractRelay {
])
if (result) {
// schedule another pingpong
setTimeout(() => this.pingpong(), this.pingFrequency)
this.pingTimeoutHandle = setTimeout(() => this.pingpong(), this.pingFrequency)
} else {
// pingpong closing socket
this.closeAllSubscriptions('pingpong timed out')
this._connected = false
this.onclose?.()
this.ws?.close()
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws?.close()
}
}
}
}
Expand Down Expand Up @@ -372,10 +422,21 @@ export class AbstractRelay {
}

public close() {
this.closedIntentionally = true
if (this.reconnectTimeoutHandle) {
clearTimeout(this.reconnectTimeoutHandle)
this.reconnectTimeoutHandle = undefined
}
if (this.pingTimeoutHandle) {
clearTimeout(this.pingTimeoutHandle)
this.pingTimeoutHandle = undefined
}
this.closeAllSubscriptions('relay connection closed by us')
this._connected = false
this.onclose?.()
this.ws?.close()
if (this.ws?.readyState === WebSocket.OPEN) {
this.ws?.close()
}
}

// this is the function assigned to this.ws.onmessage
Expand Down
114 changes: 114 additions & 0 deletions pool.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,120 @@ test('ping-pong timeout in pool', async () => {
expect(closed).toBeTrue()
})

test('reconnect on disconnect in pool', async () => {
const mockRelay = mockRelays[0]
pool = new SimplePool({ enablePing: true, enableReconnect: true })
const relay = await pool.ensureRelay(mockRelay.url)
relay.pingTimeout = 50
relay.pingFrequency = 50
relay.resubscribeBackoff = [50, 100]

let closes = 0
relay.onclose = () => {
closes++
}

expect(relay.connected).toBeTrue()

// wait for the first ping to succeed
await new Promise(resolve => setTimeout(resolve, 75))
expect(closes).toBe(0)

// now make it unresponsive
mockRelay.unresponsive = true

// wait for the second ping to fail, which will trigger a close
await new Promise(resolve => {
const interval = setInterval(() => {
if (closes > 0) {
clearInterval(interval)
resolve(null)
}
}, 10)
})
expect(closes).toBe(1)
expect(relay.connected).toBeFalse()

// now make it responsive again
mockRelay.unresponsive = false

// wait for reconnect
await new Promise(resolve => {
const interval = setInterval(() => {
if (relay.connected) {
clearInterval(interval)
resolve(null)
}
}, 10)
})

expect(relay.connected).toBeTrue()
expect(closes).toBe(1)
})

test('reconnect with filter update in pool', async () => {
const mockRelay = mockRelays[0]
const newSince = Math.floor(Date.now() / 1000)
pool = new SimplePool({
enablePing: true,
enableReconnect: filters => {
return filters.map(f => ({ ...f, since: newSince }))
},
})
const relay = await pool.ensureRelay(mockRelay.url)
relay.pingTimeout = 50
relay.pingFrequency = 50
relay.resubscribeBackoff = [50, 100]

let closes = 0
relay.onclose = () => {
closes++
}

expect(relay.connected).toBeTrue()

const sub = relay.subscribe([{ kinds: [1], since: 0 }], { onevent: () => {} })
expect(sub.filters[0].since).toBe(0)

// wait for the first ping to succeed
await new Promise(resolve => setTimeout(resolve, 75))
expect(closes).toBe(0)

// now make it unresponsive
mockRelay.unresponsive = true

// wait for the second ping to fail, which will trigger a close
await new Promise(resolve => {
const interval = setInterval(() => {
if (closes > 0) {
clearInterval(interval)
resolve(null)
}
}, 10)
})
expect(closes).toBe(1)
expect(relay.connected).toBeFalse()

// now make it responsive again
mockRelay.unresponsive = false

// wait for reconnect
await new Promise(resolve => {
const interval = setInterval(() => {
if (relay.connected) {
clearInterval(interval)
resolve(null)
}
}, 10)
})

expect(relay.connected).toBeTrue()
expect(closes).toBe(1)

// check if filter was updated
expect(sub.filters[0].since).toBe(newSince)
})

test('track relays when publishing', async () => {
let event1 = finalizeEvent(
{
Expand Down
4 changes: 2 additions & 2 deletions pool.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/* global WebSocket */

import { verifyEvent } from './pure.ts'
import { AbstractSimplePool } from './abstract-pool.ts'
import { AbstractSimplePool, type AbstractPoolConstructorOptions } from './abstract-pool.ts'

var _WebSocket: typeof WebSocket

Expand All @@ -14,7 +14,7 @@ export function useWebSocketImplementation(websocketImplementation: any) {
}

export class SimplePool extends AbstractSimplePool {
constructor(options?: { enablePing?: boolean }) {
constructor(options?: Pick<AbstractPoolConstructorOptions, 'enablePing' | 'enableReconnect'>) {
super({ verifyEvent, websocketImplementation: _WebSocket, ...options })
}
}
Expand Down
Loading