Skip to content
This repository was archived by the owner on Feb 8, 2023. It is now read-only.

Allow instantiation with existing ws #49

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 76 additions & 52 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,9 @@ export interface IlpPluginBtpConstructorOptions {
port: number,
secret: string
},
raw?: {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for this extra layer of indirection?

Also, from the option alone it isn't clear whether the created plugin will act as a client or server. Maybe it should be server_socket (in the top-level options), or listener.socket.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason for this extra layer of indirection?

Good suggestion. I think listener.socket makes the most sense.

Also, from the option alone it isn't clear whether the created plugin will act as a client or server.

The plugin is not going to act as a WS server because it is created around an existing ws socket so it's not listening for new connections.

BUT it will handle incoming auth messages (i.e. it's a "BTP server" in as much as the difference between a client and server is that one performs auth with the other even though they may actually be peers)

This is a general problem (I think) with the architecture of the plugin in that it mixes the transport (websockets) with the protocol (BTP).

I tried a more decoupled approach with ilp-transport, interested to hear what you think of that and if we could move to something similar with BTP: https://github.com/adrianhopebailie/ilp-transport

socket: WebSocket
},
reconnectInterval?: number
reconnectIntervals?: Array<number>
reconnectClearTryTimeout?: number
Expand Down Expand Up @@ -210,6 +213,10 @@ export default class AbstractBtpPlugin extends EventEmitter2 {
protected _wss: WebSocket.Server | null = null
private _incomingWs?: WebSocket

private _raw?: {
socket: WebSocket
}

/**
* Specify for a BTP instance that is acting as a client.
*/
Expand All @@ -231,6 +238,7 @@ export default class AbstractBtpPlugin extends EventEmitter2 {
this._responseTimeout = options.responseTimeout || DEFAULT_TIMEOUT
this._listener = options.listener
this._server = options.server
this._raw = options.raw

if (this._server) {
const parsedBtpUri = new URL(this._server)
Expand Down Expand Up @@ -322,48 +330,7 @@ export default class AbstractBtpPlugin extends EventEmitter2 {
this._incomingWs = undefined

wss.on('connection', (socket: WebSocket) => {
this._log.info('got connection')
let authPacket: BtpPacket

socket.on('close', (code: number) => {
this._log.info('incoming websocket closed. code=' + code)
this._emitDisconnect()
})

socket.on('error', (err: Error) => {
this._log.debug('incoming websocket error. error=', err)
this._emitDisconnect()
})

socket.once('message', async (binaryAuthMessage: WebSocket.Data) => {
try {
authPacket = BtpPacket.deserialize(binaryAuthMessage)
this._log.trace('got auth packet. packet=%j', authPacket)
this._validateAuthPacket(authPacket)
if (this._incomingWs) {
this._closeIncomingSocket(this._incomingWs, authPacket)
}
this._incomingWs = socket
socket.send(BtpPacket.serializeResponse(authPacket.requestId, []))
} catch (err) {
this._incomingWs = undefined
if (authPacket) {
const errorResponse = BtpPacket.serializeError({
code: 'F00',
name: 'NotAcceptedError',
data: err.message,
triggeredAt: new Date().toISOString()
}, authPacket.requestId, [])
socket.send(errorResponse)
}
socket.close()
return
}

this._log.trace('connection authenticated')
socket.on('message', this._handleIncomingWsMessage.bind(this, socket))
this._emitConnect()
})
this._handleConnection(socket, true)
})
this._log.info(`listening for BTP connections on ${this._listener.port}`)
}
Expand Down Expand Up @@ -420,17 +387,22 @@ export default class AbstractBtpPlugin extends EventEmitter2 {
this._ws.on('message', this._handleIncomingWsMessage.bind(this, this._ws))
}

await new Promise((resolve, reject) => {
const onDisconnect = () => {
if (this._ws) this._ws.close()
reject(new Error('connection aborted'))
}
this.once('disconnect', onDisconnect)
this.once('_first_time_connect', () => {
this.removeListener('disconnect', onDisconnect)
resolve()
/* Raw WS Login */
if (this._raw) {
this._handleConnection(this._raw.socket, false)
} else {
await new Promise((resolve, reject) => {
const onDisconnect = () => {
if (this._ws) this._ws.close()
reject(new Error('connection aborted'))
}
this.once('disconnect', onDisconnect)
this.once('_first_time_connect', () => {
this.removeListener('disconnect', onDisconnect)
resolve()
})
})
})
}

/* To be overriden. */
await this._connect()
Expand Down Expand Up @@ -479,13 +451,65 @@ export default class AbstractBtpPlugin extends EventEmitter2 {
this._incomingWs.close()
this._incomingWs = undefined
}
if (this._raw) this._raw.socket.close()
if (this._wss) this._wss.close()
}

isConnected () {
return this._readyState === ReadyState.CONNECTED
}

_handleConnection (socket: WebSocket, authenticate: boolean) {
this._log.info('got connection')

socket.on('close', (code: number) => {
this._log.info('incoming websocket closed. code=' + code)
this._emitDisconnect()
})

socket.on('error', (err: Error) => {
this._log.debug('incoming websocket error. error=', err)
this._emitDisconnect()
})

if (authenticate) {
let authPacket: BtpPacket
socket.once('message', async (binaryAuthMessage: WebSocket.Data) => {
try {
authPacket = BtpPacket.deserialize(binaryAuthMessage)
this._log.trace('got auth packet. packet=%j', authPacket)
this._validateAuthPacket(authPacket)
if (this._incomingWs) {
this._closeIncomingSocket(this._incomingWs, authPacket)
}
this._incomingWs = socket
socket.send(BtpPacket.serializeResponse(authPacket.requestId, []))
} catch (err) {
this._incomingWs = undefined
if (authPacket) {
const errorResponse = BtpPacket.serializeError({
code: 'F00',
name: 'NotAcceptedError',
data: err.message,
triggeredAt: new Date().toISOString()
}, authPacket.requestId, [])
socket.send(errorResponse)
}
socket.close()
return
}

this._log.trace('connection authenticated')
socket.on('message', this._handleIncomingWsMessage.bind(this, socket))
this._emitConnect()
})
} else {
this._incomingWs = socket
socket.on('message', this._handleIncomingWsMessage.bind(this, socket))
this._emitConnect()
}
}

/**
* Deserialize incoming websocket message and call `handleIncomingBtpPacket`.
* If error in handling btp packet, call `handleOutgoingBtpPacket` and send
Expand Down
55 changes: 55 additions & 0 deletions test/index.test.js
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
'use strict'

const BtpPacket = require('btp-packet');
const assert = require('assert')
const btp = require('btp-packet')
const Plugin = require('..')
const mockSocket = require('./helpers/mockSocket')
const WebSocket = require('ws');


describe('BtpPlugin', function () {
beforeEach(async function () {
Expand Down Expand Up @@ -120,6 +123,58 @@ describe('BtpPlugin', function () {
})
})

describe('can pass in websocket connection', function () {

beforeEach(async function () {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The rest of this project is 2-space indented.

this.client = new Plugin(this.clientOpts)
})


afterEach(async function () {
await this.client.disconnect()
})

it('get incoming socket connection and intantiate plugin', async function () {
return new Promise(resolve => {
const ws = new WebSocket.Server({ port: 9000 })
let clientConnect = null


ws.on('connection', async (connection) => {

//Manually reply to the auth message
connection.once('message', async (data) => {
const authPacket = BtpPacket.deserialize(data)
connection.send(BtpPacket.serializeResponse(authPacket.requestId, []))
})

this.server = new Plugin({raw: {socket: connection}})

await Promise.all([
clientConnect,
this.server.connect()
])

assert.strictEqual(this.server.isConnected(), true)
assert.strictEqual(this.client.isConnected(), true)

this.server.registerDataHandler((ilp) => {
assert.deepEqual(ilp, Buffer.from('foo'))
return Buffer.from('bar')
})

const response = await this.client.sendData(Buffer.from('foo'))
assert.deepEqual(response, Buffer.from('bar'))
await this.server.disconnect()
ws.close()
resolve()
})

clientConnect = this.client.connect()
})
})
})

describe('alternate client account/token config', function () {
beforeEach(async function () {
this.server = new Plugin(this.serverOpts)
Expand Down