Skip to content
This repository was archived by the owner on May 5, 2022. It is now read-only.

Commit add8372

Browse files
authored
Merge pull request #136 from interledgerjs/ko-server-improvements
feat: server reliability improvements
2 parents d355ef4 + 1ee52bf commit add8372

File tree

10 files changed

+1361
-1224
lines changed

10 files changed

+1361
-1224
lines changed

package-lock.json

Lines changed: 1101 additions & 1185 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,7 @@
7070
"ilp-plugin-mini-accounts": "^4.2.0",
7171
"mocha": "^5.1.1",
7272
"mocha-typescript": "^1.1.14",
73-
"nyc": "^13.1.0",
73+
"nyc": "^15.0.0",
7474
"puppeteer": "^1.19.0",
7575
"sinon": "^6.0.1",
7676
"source-map-support": "^0.5.6",

src/connection.ts

Lines changed: 57 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
import { EventEmitter } from 'events'
2-
import * as assert from 'assert'
32
import createLogger from 'ilp-logger'
43
import { DataAndMoneyStream } from './stream'
54
import * as IlpPacket from 'ilp-packet'
@@ -29,15 +28,12 @@ import { Reader } from 'oer-utils'
2928
import { CongestionController } from './util/congestion'
3029
import { Plugin } from './util/plugin-interface'
3130
import {
32-
LongValue,
33-
longFromValue,
3431
maxLong,
3532
minLong,
3633
minLongs,
3734
countDigits,
3835
checkedAdd,
3936
checkedSubtract,
40-
checkedMultiply,
4137
multiplyDivideFloor
4238
} from './util/long'
4339
import * as Long from 'long'
@@ -55,6 +51,8 @@ const DEFAULT_MINIMUM_EXCHANGE_RATE_PRECISION = 3
5551
const TEST_PACKET_MAX_ATTEMPTS = 20
5652

5753
export interface ConnectionOpts {
54+
/** Token in the ILP address uniquely identifying this connection */
55+
connectionId?: string,
5856
/** Ledger plugin (V2) */
5957
plugin: Plugin,
6058
/** ILP Address of the remote entity */
@@ -93,6 +91,17 @@ export interface ConnectionOpts {
9391
* When omitted, use a timeout of 30 seconds.
9492
*/
9593
getExpiry?: (destination: string) => Date,
94+
/**
95+
* Callback for the consumer to perform accounting and choose to fulfill an incoming ILP Prepare,
96+
* given the amount received, a unique identifier for the packet, and `connectionTag`.
97+
*
98+
* If the returned Promise is resolved, the ILP Prepare will be fulfilled; if it is rejected,
99+
* the ILP Prepare will be rejected. The ILP Fulfill will be immediately sent back after
100+
* the Promise is resolved.
101+
*
102+
* Note: a misbehaving sender can trigger duplicate packetIds, which should be ignored and rejected.
103+
*/
104+
shouldFulfill?: (packetAmount: Long, packetId: Buffer, connectionTag?: string) => Promise<void>,
96105
}
97106

98107
export interface BuildConnectionOpts extends ConnectionOpts {
@@ -181,15 +190,21 @@ export class Connection extends EventEmitter {
181190

182191
// TODO use longs for byte offsets
183192
protected remoteMaxOffset: number
193+
protected _incomingHold: Long
184194
protected _totalReceived: Long
185195
protected _totalSent: Long
186196
protected _totalDelivered: Long
187197
protected _lastPacketExchangeRate: Rational
188198
protected getExpiry: (destination: string) => Date
199+
protected shouldFulfill?: (packetAmount: Long, packetId: Buffer, connectionTag?: string) => Promise<void>
189200

190201
constructor (opts: NewConnectionOpts) {
191202
super()
192-
this.connectionId = uuid()
203+
204+
// Use the same connectionId for logging on both client & server
205+
const lastAddressSegment = opts.destinationAccount ? opts.destinationAccount.split('.').slice(-1)[0] : undefined
206+
this.connectionId = (opts.connectionId || lastAddressSegment || uuid()).replace(/[-_]/g, '').slice(0, 8)
207+
193208
this.plugin = opts.plugin
194209
this._sourceAccount = opts.sourceAccount
195210
this._sourceAssetCode = opts.assetCode
@@ -213,6 +228,7 @@ export class Connection extends EventEmitter {
213228
? undefined
214229
: Rational.fromNumber(opts.exchangeRate, true)
215230
this.getExpiry = opts.getExpiry || defaultGetExpiry
231+
this.shouldFulfill = opts.shouldFulfill
216232
this.idleTimeout = opts.idleTimeout || DEFAULT_IDLE_TIMEOUT
217233
this.lastActive = new Date()
218234

@@ -239,6 +255,7 @@ export class Connection extends EventEmitter {
239255

240256
this.remoteMaxOffset = this.maxBufferedData
241257

258+
this._incomingHold = Long.UZERO
242259
this._totalReceived = Long.UZERO
243260
this._totalSent = Long.UZERO
244261
this._totalDelivered = Long.UZERO
@@ -476,7 +493,8 @@ export class Connection extends EventEmitter {
476493

477494
/**
478495
* (Internal) Handle incoming ILP Prepare packets.
479-
* This will automatically fulfill all valid and expected Prepare packets.
496+
* This will automatically fulfill all valid and expected Prepare packets, or
497+
* defer to custom application logic using the `shouldFulfill` callback, if provided.
480498
* It passes the incoming money and/or data to the relevant streams.
481499
* @private
482500
*/
@@ -628,9 +646,16 @@ export class Connection extends EventEmitter {
628646
}
629647
}
630648

631-
// Add incoming amounts to each stream
632-
for (let { stream, amount } of amountsToReceive) {
633-
stream._addToIncoming(amount)
649+
this.addIncomingHold(incomingAmount)
650+
651+
// Allow consumer to choose to fulfill each packet and/or perform other logic before fulfilling
652+
if (this.shouldFulfill && incomingAmount.greaterThan(0)) {
653+
const packetId = await cryptoHelper.generateIncomingPacketId(this.sharedSecret, requestPacket.sequence)
654+
await this.shouldFulfill(incomingAmount, packetId, this.connectionTag).catch(async err => {
655+
this.removeIncomingHold(incomingAmount)
656+
this.log.debug('application declined to fulfill packet %s:', requestPacket.sequence, err)
657+
await throwFinalApplicationError()
658+
})
634659
}
635660

636661
// Tell peer about closed streams and how much each stream can receive
@@ -655,12 +680,18 @@ export class Connection extends EventEmitter {
655680
}
656681
}
657682

683+
// Add incoming amounts to each stream
684+
for (let { stream, amount } of amountsToReceive) {
685+
stream._addToIncoming(amount)
686+
}
687+
658688
// TODO make sure the queued frames aren't too big
659689
responseFrames = responseFrames.concat(this.queuedFrames)
660690
this.queuedFrames = []
661691

662692
// Return fulfillment and response packet
663693
const responsePacket = new Packet(requestPacket.sequence, IlpPacketType.Fulfill, incomingAmount, responseFrames)
694+
this.removeIncomingHold(incomingAmount)
664695
this.addTotalReceived(incomingAmount)
665696
this.log.trace('fulfilling prepare with fulfillment: %h and response packet: %j', fulfillment, responsePacket)
666697
return {
@@ -1588,19 +1619,33 @@ export class Connection extends EventEmitter {
15881619

15891620
private bumpIdle (): void { this.lastActive = new Date() }
15901621

1591-
private addTotalReceived (value: Long): void {
1592-
const result = checkedAdd(this._totalReceived, value)
1622+
private addIncomingHold (value: Long): void {
1623+
let result = checkedAdd(this._totalReceived, this._incomingHold)
1624+
result = checkedAdd(result.sum, value)
1625+
15931626
if (result.overflow) {
15941627
const err = new IlpPacket.Errors.BadRequestError('Total received exceeded MaxUint64')
15951628
err['ilpErrorMessage'] = err.message
15961629
/* tslint:disable-next-line:no-floating-promises */
15971630
this.destroy(err)
15981631
throw err
15991632
} else {
1600-
this._totalReceived = result.sum
1633+
this._incomingHold = result.sum
16011634
}
16021635
}
16031636

1637+
private removeIncomingHold (value: Long): void {
1638+
// As long as this is called after `addIncomingHold` for the same amount,
1639+
// this should never underflow
1640+
this._incomingHold = checkedSubtract(this._incomingHold, value).difference
1641+
}
1642+
1643+
private addTotalReceived (value: Long): void {
1644+
// As long as this is called after `addIncomingHold` for the same amount,
1645+
// this should never overflow
1646+
this._totalReceived = checkedAdd(this._totalReceived, value).sum
1647+
}
1648+
16041649
private addTotalSent (value: Long): void {
16051650
const result = checkedAdd(this._totalSent, value)
16061651
if (result.overflow) {

src/crypto.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ export {
1111
const TOKEN_LENGTH = 18
1212
const ENCRYPTION_KEY_STRING = Buffer.from('ilp_stream_encryption', 'utf8')
1313
const FULFILLMENT_GENERATION_STRING = Buffer.from('ilp_stream_fulfillment', 'utf8')
14+
const PACKET_ID_STRING = Buffer.from('ilp_stream_packet_id', 'utf8')
1415
export const ENCRYPTION_OVERHEAD = 28
1516

1617
export function generateToken (): Buffer {
@@ -32,3 +33,7 @@ export function generateFulfillmentKey (sharedSecret: Buffer): Promise<Buffer> {
3233
export function generateFulfillment (fulfillmentKey: Buffer, data: Buffer): Promise<Buffer> {
3334
return hmac(fulfillmentKey, data)
3435
}
36+
37+
export function generateIncomingPacketId (sharedSecret: Buffer, sequence: Long): Promise<Buffer> {
38+
return hmac(sharedSecret, Buffer.concat([PACKET_ID_STRING, Buffer.from(sequence.toBytes())]))
39+
}

src/index.ts

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,8 @@
1-
import { EventEmitter } from 'events'
21
import * as ILDCP from 'ilp-protocol-ildcp'
32
import * as IlpPacket from 'ilp-packet'
43
import createLogger from 'ilp-logger'
54
import './util/formatters'
6-
import * as cryptoHelper from './crypto'
75
import { Connection, ConnectionOpts } from './connection'
8-
import { Plugin } from './util/plugin-interface'
9-
10-
const CONNECTION_ID_REGEX = /^[a-zA-Z0-9~_-]+$/
116

127
export { Connection } from './connection'
138
export { DataAndMoneyStream } from './stream'

src/pool.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,8 @@ export class ServerConnectionPool {
5959
const conn = await Connection.build({
6060
...this.connectionOpts,
6161
sharedSecret,
62-
connectionTag
62+
connectionTag,
63+
connectionId: id
6364
})
6465
log.debug('got incoming packet for new connection: %s%s', id, (connectionTag ? ' (connectionTag: ' + connectionTag + ')' : ''))
6566
try {

src/server.ts

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,16 @@ import { ServerConnectionPool } from './pool'
88
import { Plugin } from './util/plugin-interface'
99

1010
const CONNECTION_ID_REGEX = /^[a-zA-Z0-9~_-]+$/
11+
const DEFAULT_DISCONNECT_DELAY = 100
1112

1213
export interface ServerOpts extends ConnectionOpts {
1314
serverSecret?: Buffer
15+
16+
/**
17+
* Number of milliseconds to wait between closing the server and disconnecting
18+
* the plugin so packets may be safely returned
19+
*/
20+
disconnectDelay?: number
1421
}
1522

1623
/**
@@ -32,6 +39,8 @@ export class Server extends EventEmitter {
3239
protected enablePadding?: boolean
3340
protected connected: boolean
3441
protected connectionOpts: ConnectionOpts
42+
protected pendingRequests: Promise<any> = Promise.resolve()
43+
protected disconnectDelay: number
3544
private pool: ServerConnectionPool
3645

3746
constructor (opts: ServerOpts) {
@@ -42,6 +51,7 @@ export class Server extends EventEmitter {
4251
this.connectionOpts = Object.assign({}, opts, {
4352
serverSecret: undefined
4453
}) as ConnectionOpts
54+
this.disconnectDelay = opts.disconnectDelay || DEFAULT_DISCONNECT_DELAY
4555
this.connected = false
4656
}
4757

@@ -64,7 +74,12 @@ export class Server extends EventEmitter {
6474
if (this.connected && this.plugin.isConnected()) {
6575
return
6676
}
67-
this.plugin.registerDataHandler(this.handleData.bind(this))
77+
this.plugin.registerDataHandler(data => {
78+
this.emit('_incoming_prepare')
79+
const request = this.handleData(data)
80+
this.pendingRequests = this.pendingRequests.then(() => request.finally())
81+
return request
82+
})
6883
await this.plugin.connect()
6984
const { clientAddress, assetCode, assetScale } = await ILDCP.fetch(this.plugin.sendData.bind(this.plugin))
7085
this.serverAccount = clientAddress
@@ -87,9 +102,28 @@ export class Server extends EventEmitter {
87102
* End all connections and disconnect the plugin
88103
*/
89104
async close (): Promise<void> {
105+
// Stop handling new requests, and return T99 while the connection is closing.
106+
// If an F02 unreachable was returned on new packets: clients would immediately destroy the connection
107+
// If an F99 was returned on on new packets: clients would retry with no backoff
108+
this.plugin.deregisterDataHandler()
109+
this.plugin.registerDataHandler(async () => IlpPacket.serializeIlpReject({
110+
code: IlpPacket.Errors.codes.T99_APPLICATION_ERROR,
111+
triggeredBy: this.serverAccount,
112+
message: 'Shutting down server',
113+
data: Buffer.alloc(0)
114+
}))
115+
116+
// Wait for in-progress requests to finish so all Fulfills are returned
117+
await this.pendingRequests
118+
// Allow the plugin time to send the reply packets back before disconnecting it
119+
await new Promise(r => setTimeout(r, this.disconnectDelay))
120+
121+
// Gracefully close the connection and all streams
90122
await this.pool.close()
123+
91124
this.plugin.deregisterDataHandler()
92125
await this.plugin.disconnect()
126+
93127
this.emit('_close')
94128
this.connected = false
95129
}

0 commit comments

Comments
 (0)