Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
212 changes: 212 additions & 0 deletions packages/streams/src/mdns-ws.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,212 @@
import { expect } from 'chai'
import MdnsWs from './mdns-ws'
import { createMockApp, createDebugStub } from './test-helpers'

const SK_HELLO = JSON.stringify({
name: 'test-server',
version: '2.0.0',
roles: ['master'],
self: 'vessels.urn:mrn:imo:mmsi:123456789'
})

const { WebSocketServer } = require('ws')

const { Writable } = require('stream')

function createSkWsServer(): Promise<{
wss: InstanceType<typeof WebSocketServer>
port: number
close: () => Promise<void>
}> {
return new Promise((resolve) => {
const wss = new WebSocketServer(
{
port: 0,
host: '127.0.0.1',
path: '/signalk/v1/stream'
},
() => {
const addr = wss.address() as { port: number }
resolve({
wss,
port: addr.port,
close: () =>
new Promise<void>((res) => {
wss.clients.forEach((c: { close: () => void }) => c.close())
wss.close(() => res())
})
})
}
)

wss.on('connection', (ws: { send: (data: string) => void }) => {
ws.send(SK_HELLO)
})
})
}

function createSink(): {
chunks: unknown[]
writable: InstanceType<typeof Writable>
} {
const chunks: unknown[] = []
const writable = new Writable({
objectMode: true,
write(chunk: unknown, _encoding: string, callback: () => void) {
chunks.push(chunk)
writable.emit('data-received', chunk)
callback()
}
})
return { chunks, writable }
}

describe('MdnsWs', () => {
it('sets provider status on successful connection', function (done) {
this.timeout(10000)
let server: Awaited<ReturnType<typeof createSkWsServer>>

createSkWsServer().then((s) => {
server = s
const app = createMockApp()
const mdns = new MdnsWs({
app,
providerId: 'test-mdns',
host: '127.0.0.1',
port: server.port,
createDebug: createDebugStub()
})

mdns.pipe(createSink().writable)

const check = setInterval(() => {
if (app.providerStatuses.some((s) => s.msg.includes('connected'))) {
clearInterval(check)
mdns.destroy()
server.close().then(() => done())
}
}, 100)
})
})

it('sets provider error on connection failure', function (done) {
this.timeout(10000)
const app = createMockApp()
const mdns = new MdnsWs({
app,
providerId: 'test-mdns',
host: '127.0.0.1',
port: 1,
createDebug: createDebugStub()
})

mdns.pipe(createSink().writable)

const check = setInterval(() => {
if (app.providerErrors.length > 0) {
clearInterval(check)
expect(app.providerErrors[0]!.id).to.equal('test-mdns')
mdns.destroy()
done()
}
}, 100)
})

it('detects disconnect when server closes', function (done) {
this.timeout(10000)
let server: Awaited<ReturnType<typeof createSkWsServer>>

createSkWsServer().then((s) => {
server = s
const app = createMockApp()
const mdns = new MdnsWs({
app,
providerId: 'test-mdns',
host: '127.0.0.1',
port: server.port,
createDebug: createDebugStub()
})

mdns.pipe(createSink().writable)

const checkConnected = setInterval(() => {
if (app.providerStatuses.some((s) => s.msg.includes('connected'))) {
clearInterval(checkConnected)

server.close().then(() => {
const checkDisconnect = setInterval(() => {
if (
app.providerErrors.some(
(e) => e.msg.includes('disconnect') || e.msg !== ''
)
) {
clearInterval(checkDisconnect)
mdns.destroy()
done()
}
}, 100)
})
}
}, 100)
})
})

it('receives delta data through the stream', function (done) {
this.timeout(10000)
let server: Awaited<ReturnType<typeof createSkWsServer>>

createSkWsServer().then((s) => {
server = s

server.wss.on('connection', (ws: { send: (data: string) => void }) => {
setTimeout(() => {
ws.send(
JSON.stringify({
context: 'vessels.urn:mrn:imo:mmsi:123456789',
updates: [
{
values: [{ path: 'navigation.speedOverGround', value: 3.5 }],
source: { label: 'test' }
}
]
})
)
}, 200)
})

const app = createMockApp()
const mdns = new MdnsWs({
app,
providerId: 'test-mdns',
host: '127.0.0.1',
port: server.port,
createDebug: createDebugStub()
})

const { chunks, writable } = createSink()
mdns.pipe(writable)

interface DeltaChunk {
updates?: Array<{
values?: Array<{ path: string }>
$source?: string
}>
}

writable.on('data-received', () => {
const delta = chunks.find(
(c) =>
(c as DeltaChunk)?.updates?.[0]?.values?.[0]?.path ===
'navigation.speedOverGround'
)
if (delta) {
expect((delta as DeltaChunk).updates![0]!['$source']).to.include(
'test-mdns'
)
mdns.destroy()
server.close().then(() => done())
}
})
})
})
})
122 changes: 64 additions & 58 deletions packages/streams/src/mdns-ws.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,67 +112,71 @@ export default class MdnsWs extends Transform {
}

private connectClient(client: Client): void {
this.fetchedMetaPaths.clear()
client
.connect()
.then(() => {
this.options.app.setProviderStatus(
this.options.providerId,
`ws connection connected to ${client.options.hostname}:${client.options.port}`
)
console.log(
`ws connection connected to ${client.options.hostname}:${client.options.port}`
)
if (this.options.selfHandling === 'useRemoteSelf') {
client
.API()
.then((api) => api.get('/self'))
.then((selfFromServer) => {
this.debug(
`Mapping context ${selfFromServer} to self (empty context)`
)
this.handleContext = (delta) => {
if (delta.context === selfFromServer) {
delete delta.context
}
}
})
.catch((err) => {
console.error('Error retrieving self from remote server')
console.error(err)
})
}
this.remoteServers[
client.options.hostname + ':' + client.options.port
] = client
if (this.options.subscription) {
let parsed: object | object[]
try {
parsed = JSON.parse(this.options.subscription)
} catch (ex) {
const error = ex as Error
this.options.app.setProviderError(
this.options.providerId,
`unable to parse subscription json: ${this.options.subscription}: ${error.message}`
)
console.error(
`unable to parse subscription json: ${this.options.subscription}: ${error.message}`
client.on('connect', () => {
this.fetchedMetaPaths.clear()
this.options.app.setProviderStatus(
this.options.providerId,
`ws connection connected to ${client.options.hostname}:${client.options.port}`
)
console.log(
`ws connection connected to ${client.options.hostname}:${client.options.port}`
)
if (this.options.selfHandling === 'useRemoteSelf') {
client
.API()
.then((api) => api.get('/self'))
.then((selfFromServer) => {
this.debug(
`Mapping context ${selfFromServer} to self (empty context)`
)
return
}
if (!Array.isArray(parsed)) {
parsed = [parsed]
}
;(parsed as object[]).forEach((sub: object, idx: number) => {
this.debug('sending subscription %j', sub)
client.subscribe(sub, String(idx))
this.handleContext = (delta) => {
if (delta.context === selfFromServer) {
delete delta.context
}
}
})
.catch((err) => {
console.error('Error retrieving self from remote server')
console.error(err)
})
}
this.remoteServers[client.options.hostname + ':' + client.options.port] =
client
if (this.options.subscription) {
let parsed: object | object[]
try {
parsed = JSON.parse(this.options.subscription)
} catch (ex) {
const error = ex as Error
this.options.app.setProviderError(
this.options.providerId,
`unable to parse subscription json: ${this.options.subscription}: ${error.message}`
)
console.error(
`unable to parse subscription json: ${this.options.subscription}: ${error.message}`
)
return
}
})
.catch((err: Error) => {
this.options.app.setProviderError(this.options.providerId, err.message)
console.error(err.message)
})
if (!Array.isArray(parsed)) {
parsed = [parsed]
}
;(parsed as object[]).forEach((sub: object, idx: number) => {
this.debug('sending subscription %j', sub)
client.subscribe(sub, String(idx))
})
}
})

client.on('disconnect', () => {
this.options.app.setProviderError(
this.options.providerId,
`ws connection disconnected from ${client.options.hostname}:${client.options.port}`
)
})

client.on('error', (err: Error) => {
this.options.app.setProviderError(this.options.providerId, err.message)
})

client.on('delta', (data: DeltaMessage) => {
if (data && data.updates) {
Expand Down Expand Up @@ -202,6 +206,8 @@ export default class MdnsWs extends Transform {
}
}
})

client.connect().catch(() => {})
}

private fetchMetaIfNeeded(
Expand Down
Loading