diff --git a/src/index.ts b/src/index.ts index 7e44860..4659fb4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -132,6 +132,9 @@ export interface IlpPluginBtpConstructorOptions { port: number, secret: string }, + raw?: { + socket: WebSocket + }, reconnectInterval?: number reconnectIntervals?: Array reconnectClearTryTimeout?: number @@ -210,6 +213,10 @@ export default class AbstractBtpPlugin extends EventEmitter2 { protected _wss: WebSocket.Server | null = null private _incomingWs?: WebSocket + private _raw?: { + socket: WebSocket + } + /** * Specify for a BTP instance that is acting as a client. */ @@ -231,6 +238,7 @@ export default class AbstractBtpPlugin extends EventEmitter2 { this._responseTimeout = options.responseTimeout || DEFAULT_TIMEOUT this._listener = options.listener this._server = options.server + this._raw = options.raw if (this._server) { const parsedBtpUri = new URL(this._server) @@ -322,48 +330,7 @@ export default class AbstractBtpPlugin extends EventEmitter2 { this._incomingWs = undefined wss.on('connection', (socket: WebSocket) => { - this._log.info('got connection') - let authPacket: BtpPacket - - socket.on('close', (code: number) => { - this._log.info('incoming websocket closed. code=' + code) - this._emitDisconnect() - }) - - socket.on('error', (err: Error) => { - this._log.debug('incoming websocket error. error=', err) - this._emitDisconnect() - }) - - socket.once('message', async (binaryAuthMessage: WebSocket.Data) => { - try { - authPacket = BtpPacket.deserialize(binaryAuthMessage) - this._log.trace('got auth packet. packet=%j', authPacket) - this._validateAuthPacket(authPacket) - if (this._incomingWs) { - this._closeIncomingSocket(this._incomingWs, authPacket) - } - this._incomingWs = socket - socket.send(BtpPacket.serializeResponse(authPacket.requestId, [])) - } catch (err) { - this._incomingWs = undefined - if (authPacket) { - const errorResponse = BtpPacket.serializeError({ - code: 'F00', - name: 'NotAcceptedError', - data: err.message, - triggeredAt: new Date().toISOString() - }, authPacket.requestId, []) - socket.send(errorResponse) - } - socket.close() - return - } - - this._log.trace('connection authenticated') - socket.on('message', this._handleIncomingWsMessage.bind(this, socket)) - this._emitConnect() - }) + this._handleConnection(socket, true) }) this._log.info(`listening for BTP connections on ${this._listener.port}`) } @@ -420,17 +387,22 @@ export default class AbstractBtpPlugin extends EventEmitter2 { this._ws.on('message', this._handleIncomingWsMessage.bind(this, this._ws)) } - await new Promise((resolve, reject) => { - const onDisconnect = () => { - if (this._ws) this._ws.close() - reject(new Error('connection aborted')) - } - this.once('disconnect', onDisconnect) - this.once('_first_time_connect', () => { - this.removeListener('disconnect', onDisconnect) - resolve() + /* Raw WS Login */ + if (this._raw) { + this._handleConnection(this._raw.socket, false) + } else { + await new Promise((resolve, reject) => { + const onDisconnect = () => { + if (this._ws) this._ws.close() + reject(new Error('connection aborted')) + } + this.once('disconnect', onDisconnect) + this.once('_first_time_connect', () => { + this.removeListener('disconnect', onDisconnect) + resolve() + }) }) - }) + } /* To be overriden. */ await this._connect() @@ -479,6 +451,7 @@ export default class AbstractBtpPlugin extends EventEmitter2 { this._incomingWs.close() this._incomingWs = undefined } + if (this._raw) this._raw.socket.close() if (this._wss) this._wss.close() } @@ -486,6 +459,57 @@ export default class AbstractBtpPlugin extends EventEmitter2 { return this._readyState === ReadyState.CONNECTED } + _handleConnection (socket: WebSocket, authenticate: boolean) { + this._log.info('got connection') + + socket.on('close', (code: number) => { + this._log.info('incoming websocket closed. code=' + code) + this._emitDisconnect() + }) + + socket.on('error', (err: Error) => { + this._log.debug('incoming websocket error. error=', err) + this._emitDisconnect() + }) + + if (authenticate) { + let authPacket: BtpPacket + socket.once('message', async (binaryAuthMessage: WebSocket.Data) => { + try { + authPacket = BtpPacket.deserialize(binaryAuthMessage) + this._log.trace('got auth packet. packet=%j', authPacket) + this._validateAuthPacket(authPacket) + if (this._incomingWs) { + this._closeIncomingSocket(this._incomingWs, authPacket) + } + this._incomingWs = socket + socket.send(BtpPacket.serializeResponse(authPacket.requestId, [])) + } catch (err) { + this._incomingWs = undefined + if (authPacket) { + const errorResponse = BtpPacket.serializeError({ + code: 'F00', + name: 'NotAcceptedError', + data: err.message, + triggeredAt: new Date().toISOString() + }, authPacket.requestId, []) + socket.send(errorResponse) + } + socket.close() + return + } + + this._log.trace('connection authenticated') + socket.on('message', this._handleIncomingWsMessage.bind(this, socket)) + this._emitConnect() + }) + } else { + this._incomingWs = socket + socket.on('message', this._handleIncomingWsMessage.bind(this, socket)) + this._emitConnect() + } + } + /** * Deserialize incoming websocket message and call `handleIncomingBtpPacket`. * If error in handling btp packet, call `handleOutgoingBtpPacket` and send diff --git a/test/index.test.js b/test/index.test.js index de38e51..1a437fe 100644 --- a/test/index.test.js +++ b/test/index.test.js @@ -1,9 +1,12 @@ 'use strict' +const BtpPacket = require('btp-packet'); const assert = require('assert') const btp = require('btp-packet') const Plugin = require('..') const mockSocket = require('./helpers/mockSocket') +const WebSocket = require('ws'); + describe('BtpPlugin', function () { beforeEach(async function () { @@ -120,6 +123,58 @@ describe('BtpPlugin', function () { }) }) + describe('can pass in websocket connection', function () { + + beforeEach(async function () { + this.client = new Plugin(this.clientOpts) + }) + + + afterEach(async function () { + await this.client.disconnect() + }) + + it('get incoming socket connection and intantiate plugin', async function () { + return new Promise(resolve => { + const ws = new WebSocket.Server({ port: 9000 }) + let clientConnect = null + + + ws.on('connection', async (connection) => { + + //Manually reply to the auth message + connection.once('message', async (data) => { + const authPacket = BtpPacket.deserialize(data) + connection.send(BtpPacket.serializeResponse(authPacket.requestId, [])) + }) + + this.server = new Plugin({raw: {socket: connection}}) + + await Promise.all([ + clientConnect, + this.server.connect() + ]) + + assert.strictEqual(this.server.isConnected(), true) + assert.strictEqual(this.client.isConnected(), true) + + this.server.registerDataHandler((ilp) => { + assert.deepEqual(ilp, Buffer.from('foo')) + return Buffer.from('bar') + }) + + const response = await this.client.sendData(Buffer.from('foo')) + assert.deepEqual(response, Buffer.from('bar')) + await this.server.disconnect() + ws.close() + resolve() + }) + + clientConnect = this.client.connect() + }) + }) +}) + describe('alternate client account/token config', function () { beforeEach(async function () { this.server = new Plugin(this.serverOpts)