diff --git a/.changeset/late-queens-check.md b/.changeset/late-queens-check.md new file mode 100644 index 0000000000..2606b0e0fd --- /dev/null +++ b/.changeset/late-queens-check.md @@ -0,0 +1,5 @@ +--- +'@chainlink/deutsche-boerse-adapter': minor +--- + +Split the endpoints and added support for tradegate diff --git a/packages/sources/deutsche-boerse/src/endpoint/index.ts b/packages/sources/deutsche-boerse/src/endpoint/index.ts index 8d7a7b68f6..6b5eefa57c 100644 --- a/packages/sources/deutsche-boerse/src/endpoint/index.ts +++ b/packages/sources/deutsche-boerse/src/endpoint/index.ts @@ -1 +1,2 @@ export { endpoint as lwba } from './lwba' +export { endpoint as price } from './price' diff --git a/packages/sources/deutsche-boerse/src/endpoint/lwba.ts b/packages/sources/deutsche-boerse/src/endpoint/lwba.ts index fddcc5db80..20c5b6270c 100644 --- a/packages/sources/deutsche-boerse/src/endpoint/lwba.ts +++ b/packages/sources/deutsche-boerse/src/endpoint/lwba.ts @@ -1,9 +1,9 @@ -import { AdapterEndpoint } from '@chainlink/external-adapter-framework/adapter' +import { AdapterEndpoint } from '@chainlink/external-adapter-framework/adapter/endpoint' import { InputParameters } from '@chainlink/external-adapter-framework/validation' import { config } from '../config' -import { wsTransport } from '../transport/lwba' +import { lwbaProtobufWsTransport } from '../transport/lwba' -export const MARKETS = ['md-xetraetfetp'] as const +export const MARKETS = ['md-xetraetfetp', 'md-tradegate'] as const export type Market = (typeof MARKETS)[number] export const inputParameters = new InputParameters( @@ -30,27 +30,26 @@ export const inputParameters = new InputParameters( ], ) -interface LwbaLatestPriceResponse { +interface LwbaResponse { Result: number | null Data: { mid: number bid: number ask: number - latestPrice: number - quoteProviderIndicatedTimeUnixMs: number - tradeProviderIndicatedTimeUnixMs: number + bidSize: number + askSize: number } } export type BaseEndpointTypes = { Parameters: typeof inputParameters.definition - Response: LwbaLatestPriceResponse + Response: LwbaResponse Settings: typeof config.settings } export const endpoint = new AdapterEndpoint({ name: 'lwba', aliases: [], - transport: wsTransport, + transport: lwbaProtobufWsTransport, inputParameters, }) diff --git a/packages/sources/deutsche-boerse/src/endpoint/price.ts b/packages/sources/deutsche-boerse/src/endpoint/price.ts new file mode 100644 index 0000000000..1c2f1c1d81 --- /dev/null +++ b/packages/sources/deutsche-boerse/src/endpoint/price.ts @@ -0,0 +1,24 @@ +import { AdapterEndpoint } from '@chainlink/external-adapter-framework/adapter' +import { config } from '../config' +import { priceProtobufWsTransport } from '../transport/price' +import { inputParameters } from './lwba' + +export interface PriceResponse { + Result: number | null + Data: { + latestPrice: number + } +} + +export type BaseEndpointTypes = { + Parameters: typeof inputParameters.definition + Response: PriceResponse + Settings: typeof config.settings +} + +export const endpoint = new AdapterEndpoint({ + name: 'price', + aliases: [], + transport: priceProtobufWsTransport, + inputParameters, +}) diff --git a/packages/sources/deutsche-boerse/src/index.ts b/packages/sources/deutsche-boerse/src/index.ts index 49f51bc0d0..c7d46b0091 100644 --- a/packages/sources/deutsche-boerse/src/index.ts +++ b/packages/sources/deutsche-boerse/src/index.ts @@ -1,13 +1,13 @@ import { expose, ServerInstance } from '@chainlink/external-adapter-framework' import { Adapter } from '@chainlink/external-adapter-framework/adapter' import { config } from './config' -import { lwba } from './endpoint' +import { lwba, price } from './endpoint' export const adapter = new Adapter({ defaultEndpoint: lwba.name, name: 'DEUTSCHE_BOERSE', config, - endpoints: [lwba], + endpoints: [lwba, price], }) export const server = (): Promise => expose(adapter) diff --git a/packages/sources/deutsche-boerse/src/transport/instrument-quote-cache.ts b/packages/sources/deutsche-boerse/src/transport/instrument-quote-cache.ts index 5de1c7903b..649d9eec1a 100644 --- a/packages/sources/deutsche-boerse/src/transport/instrument-quote-cache.ts +++ b/packages/sources/deutsche-boerse/src/transport/instrument-quote-cache.ts @@ -4,6 +4,8 @@ export type Quote = { bid?: number ask?: number mid?: number + bidSize?: number + askSize?: number latestPrice?: number quoteProviderTimeUnixMs?: number tradeProviderTimeUnixMs?: number @@ -12,31 +14,49 @@ export type Quote = { export class InstrumentQuoteCache { private readonly map = new Map() - activate(isin: string) { - if (!this.map.has(isin)) this.map.set(isin, {}) + private createKey(market: string, isin: string): string { + return `${market}-${isin}` } - deactivate(isin: string) { - this.map.delete(isin) + + activate(market: string, isin: string) { + const key = this.createKey(market, isin) + if (!this.map.has(key)) this.map.set(key, {}) + } + deactivate(market: string, isin: string) { + const key = this.createKey(market, isin) + this.map.delete(key) } - has(isin: string): boolean { - return this.map.has(isin) + has(market: string, isin: string): boolean { + const key = this.createKey(market, isin) + return this.map.has(key) } - get(isin: string): Quote | undefined { - return this.map.get(isin) + get(market: string, isin: string): Quote | undefined { + const key = this.createKey(market, isin) + return this.map.get(key) } - addQuote(isin: string, bid: number, ask: number, providerTime: number) { - const quote = this.get(isin) + addQuote( + market: string, + isin: string, + bid: number, + ask: number, + providerTime: number, + bidSz: number, + askSz: number, + ) { + const quote = this.get(market, isin) if (!quote) { - throw new Error(`Cannot add quote for inactive ISIN ${isin}`) + throw new Error(`Cannot add quote for inactive instrument ${market}-${isin}`) } const mid = new Decimal(bid).plus(ask).div(2) quote.bid = bid quote.ask = ask quote.mid = mid.toNumber() quote.quoteProviderTimeUnixMs = providerTime + quote.bidSize = bidSz + quote.askSize = askSz } - addBid(isin: string, bid: number, providerTime: number) { - const quote = this.get(isin) + addBid(market: string, isin: string, bid: number, providerTime: number, bidSz?: number) { + const quote = this.get(market, isin) if (!quote) { throw new Error(`Cannot add quote for inactive ISIN ${isin}`) } @@ -46,9 +66,10 @@ export class InstrumentQuoteCache { } quote.bid = bid quote.quoteProviderTimeUnixMs = providerTime + quote.bidSize = bidSz } - addAsk(isin: string, ask: number, providerTime: number) { - const quote = this.get(isin) + addAsk(market: string, isin: string, ask: number, providerTime: number, askSz?: number) { + const quote = this.get(market, isin) if (!quote) { throw new Error(`Cannot add quote for inactive ISIN ${isin}`) } @@ -59,11 +80,13 @@ export class InstrumentQuoteCache { } quote.ask = ask quote.quoteProviderTimeUnixMs = providerTime + quote.askSize = askSz } - addTrade(isin: string, lastPrice: number, providerTime: number) { - const quote = this.get(isin) + + addTrade(market: string, isin: string, lastPrice: number, providerTime: number) { + const quote = this.get(market, isin) if (!quote) { - throw new Error(`Cannot add trade for inactive ISIN ${isin}`) + throw new Error(`Cannot add trade for inactive instrument ${market}-${isin}`) } quote.latestPrice = lastPrice quote.tradeProviderTimeUnixMs = providerTime diff --git a/packages/sources/deutsche-boerse/src/transport/lwba.ts b/packages/sources/deutsche-boerse/src/transport/lwba.ts index 5b6c719daa..cbe6b36b4e 100644 --- a/packages/sources/deutsche-boerse/src/transport/lwba.ts +++ b/packages/sources/deutsche-boerse/src/transport/lwba.ts @@ -1,7 +1,9 @@ import { create, fromBinary, toBinary } from '@bufbuild/protobuf' -import { WebSocketTransport } from '@chainlink/external-adapter-framework/transports' +import { TransportGenerics } from '@chainlink/external-adapter-framework/transports' import { makeLogger } from '@chainlink/external-adapter-framework/util' +import { config } from '../config' import { BaseEndpointTypes, Market, MARKETS } from '../endpoint/lwba' +import { BaseEndpointTypes as PriceBaseEndpointTypes } from '../endpoint/price' import { RequestSchema, StreamMessageSchema, @@ -10,7 +12,7 @@ import { type StreamMessage, } from '../gen/client_pb' import { MarketDataSchema, type MarketData } from '../gen/md_cef_pb' -import { InstrumentQuoteCache } from './instrument-quote-cache' +import { InstrumentQuoteCache, Quote } from './instrument-quote-cache' import { decimalToNumber, hasSingleBidFrame, @@ -21,15 +23,23 @@ import { } from './proto-utils' import { ProtobufWsTransport } from './protobuf-wstransport' -export type WsTransportTypes = BaseEndpointTypes & { +export type WsTransportTypes = (BaseEndpointTypes | PriceBaseEndpointTypes) & { Provider: { WsMessage: Buffer } } +type BaseTransportTypes = { + Parameters: TransportGenerics['Parameters'] + Response: TransportGenerics['Response'] + Settings: TransportGenerics['Settings'] & typeof config.settings +} + const logger = makeLogger('DeutscheBoerseTransport') -export function createLwbaWsTransport() { +export function createLwbaWsTransport( + extractData: (quote: Quote) => BaseEndpointTypes['Response']['Data'], +) { const cache = new InstrumentQuoteCache() let ttlInterval: ReturnType | undefined const transport = new ProtobufWsTransport({ @@ -96,45 +106,26 @@ export function createLwbaWsTransport() { return [] } const { market, md } = decoded - const result = processMarketData(md, cache) + const result = processMarketData(md, cache, market) if (!result) { return [] } const { isin, providerTime } = result - const quote = cache.get(isin) + const quote = cache.get(market, isin) if (quote == null) { logger.error({ isin, market }, 'Quote missing from cache after processing frame') return [] } - if ( - quote.mid == null || - quote.ask == null || - quote.bid == null || - quote.latestPrice == null || - quote.quoteProviderTimeUnixMs == null || - quote.tradeProviderTimeUnixMs == null - ) { - logger.error( - { isin, market }, - 'Neither mid nor latestPrice present after processing frame', - ) - logger.debug({ isin, market }, 'Awaiting complete quote before emitting') + const responseData = extractData(quote) + if (!responseData) { return [] } - return [ { params: { isin, market }, response: { result: null, - data: { - mid: quote.mid, - bid: quote.bid, - ask: quote.ask, - latestPrice: quote.latestPrice, - quoteProviderIndicatedTimeUnixMs: quote.quoteProviderTimeUnixMs, - tradeProviderIndicatedTimeUnixMs: quote.tradeProviderTimeUnixMs, - }, + data: responseData as WsTransportTypes['Response']['Data'], timestamps: { providerIndicatedTimeUnixMs: providerTime }, }, }, @@ -144,7 +135,7 @@ export function createLwbaWsTransport() { builders: { subscribeMessage: (p: { market: string; isin: string }) => { if (cache.isEmpty()) { - cache.activate(p.isin) + cache.activate(p.market, p.isin) const req = create(RequestSchema, { event: 'subscribe', requestId: BigInt(Date.now()), @@ -158,7 +149,7 @@ export function createLwbaWsTransport() { ) return toBinary(RequestSchema, req) } - cache.activate(p.isin) + cache.activate(p.market, p.isin) logger.debug( { isin: p.isin, market: p.market }, 'Instrument activated; stream already subscribed, no outbound subscribe message sent', @@ -167,7 +158,7 @@ export function createLwbaWsTransport() { }, unsubscribeMessage: (p: { market: string; isin: string }) => { - cache.deactivate(p.isin) + cache.deactivate(p.market, p.isin) if (cache.isEmpty()) { const req = create(RequestSchema, { event: 'unsubscribe', @@ -201,36 +192,40 @@ function decodeStreamMessage(buf: Buffer): StreamMessage | null { } } -const updateTTL = async (transport: WebSocketTransport, ttl: number) => { +const updateTTL = async (transport: ProtobufWsTransport, ttl: number) => { const params = await transport.subscriptionSet.getAll() transport.responseCache.writeTTL(transport.name, params, ttl) } function processMarketData( md: MarketData, cache: InstrumentQuoteCache, + market: string, ): { isin: string providerTime: number } | null { const isin = parseIsin(md) - const dat: any = (md as any)?.Dat ?? {} - if (!isin) { - logger.warn({ md }, 'Could not parse ISIN from MarketData.Instrmt.Sym') + logger.warn('Could not parse ISIN from MarketData') + return null + } + const dat: any = (md as MarketData)?.Dat + if (!dat) { + logger.warn('Could not parse MarketData from MarketData.Instrmt') return null } - const quote = cache.get(isin) + const quote = cache.get(market, isin) if (!quote) { - logger.debug({ isin }, 'Ignoring message for inactive instrument (not in cache)') + logger.debug('Ignoring message for inactive instrument (not in cache)') return null } const providerTime = pickProviderTime(dat) if (isSingleTradeFrame(dat)) { - const latestPrice = decimalToNumber(dat.Px) - cache.addTrade(isin, latestPrice, providerTime) + const latestPrice = decimalToNumber(dat!.Px) + cache.addTrade(market, isin, latestPrice, providerTime) logger.debug( { isin, latestPrice, providerTimeUnixMs: providerTime }, 'Processed single trade frame', @@ -240,7 +235,9 @@ function processMarketData( if (hasSingleBidFrame(dat) && hasSingleOfferFrame(dat)) { const bidPx = decimalToNumber(dat!.Bid!.Px) const askPx = decimalToNumber(dat!.Offer!.Px) - cache.addQuote(isin, bidPx, askPx, providerTime) + const bidSz = decimalToNumber(dat!.Bid!.Sz) + const askSz = decimalToNumber(dat!.Offer!.Sz) + cache.addQuote(market, isin, bidPx, askPx, providerTime, bidSz, askSz) logger.debug( { isin, bid: bidPx, ask: askPx, mid: (bidPx + askPx) / 2, providerTimeUnixMs: providerTime }, 'Processed single quote frame', @@ -249,7 +246,8 @@ function processMarketData( } if (hasSingleBidFrame(dat)) { const bidPx = decimalToNumber(dat!.Bid!.Px) - cache.addBid(isin, bidPx, providerTime) + const bidSz = decimalToNumber(dat!.Bid!.Sz) + cache.addBid(market, isin, bidPx, providerTime, bidSz) logger.debug( { isin, bid: bidPx, providerTimeUnixMs: providerTime }, 'Processed single bid frame', @@ -259,7 +257,8 @@ function processMarketData( if (hasSingleOfferFrame(dat)) { const askPx = decimalToNumber(dat!.Offer!.Px) - cache.addAsk(isin, askPx, providerTime) + const askSz = decimalToNumber(dat!.Offer!.Sz) + cache.addAsk(market, isin, askPx, providerTime, askSz) logger.debug( { isin, ask: askPx, providerTimeUnixMs: providerTime }, 'Processed single offer frame', @@ -298,4 +297,23 @@ function decodeSingleMarketData(sm: StreamMessage): { market: Market; md: Market function isMarket(x: string): x is Market { return (MARKETS as readonly string[]).includes(x) } -export const wsTransport = createLwbaWsTransport() + +export const lwbaProtobufWsTransport = createLwbaWsTransport((quote) => { + if ( + quote.bid == null || + quote.ask == null || + quote.mid == null || + quote.bidSize == null || + quote.askSize == null + ) { + return undefined + } + + return { + bid: quote.bid, + ask: quote.ask, + mid: quote.mid, + bidSize: quote.bidSize, + askSize: quote.askSize, + } +}) diff --git a/packages/sources/deutsche-boerse/src/transport/price.ts b/packages/sources/deutsche-boerse/src/transport/price.ts new file mode 100644 index 0000000000..e32d7a244e --- /dev/null +++ b/packages/sources/deutsche-boerse/src/transport/price.ts @@ -0,0 +1,11 @@ +import { createLwbaWsTransport } from './lwba' + +export const priceProtobufWsTransport = createLwbaWsTransport((quote) => { + if (quote.latestPrice == null) { + return undefined + } + + return { + latestPrice: quote.latestPrice, + } +}) diff --git a/packages/sources/deutsche-boerse/src/transport/proto-utils.ts b/packages/sources/deutsche-boerse/src/transport/proto-utils.ts index d85a4404a0..d4579f0ffe 100644 --- a/packages/sources/deutsche-boerse/src/transport/proto-utils.ts +++ b/packages/sources/deutsche-boerse/src/transport/proto-utils.ts @@ -7,7 +7,7 @@ import type { const MAX_SIG_DIGITS = 15 export function decimalToNumber(decimal?: DecimalProto): number { - if (!decimal || decimal.m === undefined || decimal.e === undefined || decimal.m < 0) { + if (!decimal || decimal.m === undefined || decimal.e === undefined) { throw new Error('Invalid price') } diff --git a/packages/sources/deutsche-boerse/test/integration/__snapshots__/adapter.test.ts.snap b/packages/sources/deutsche-boerse/test/integration/__snapshots__/adapter.test.ts.snap index 7c9f0c7c4f..3e847f6bbf 100644 --- a/packages/sources/deutsche-boerse/test/integration/__snapshots__/adapter.test.ts.snap +++ b/packages/sources/deutsche-boerse/test/integration/__snapshots__/adapter.test.ts.snap @@ -1,14 +1,13 @@ // Jest Snapshot v1, https://goo.gl/fbAQLP -exports[`websocket lwba endpoint returns success and exposes quote/trade fields 1`] = ` +exports[`websocket lwba endpoint returns success and exposes bid/ask/mid fields 1`] = ` { "data": { "ask": 101, + "askSize": 1000, "bid": 100, - "latestPrice": 100.1, + "bidSize": 2000, "mid": 100.5, - "quoteProviderIndicatedTimeUnixMs": 5, - "tradeProviderIndicatedTimeUnixMs": 6, }, "result": null, "statusCode": 200, @@ -19,3 +18,18 @@ exports[`websocket lwba endpoint returns success and exposes quote/trade fields }, } `; + +exports[`websocket price endpoint returns success and exposes latestPrice 1`] = ` +{ + "data": { + "latestPrice": 100.1, + }, + "result": null, + "statusCode": 200, + "timestamps": { + "providerDataReceivedUnixMs": 2028, + "providerDataStreamEstablishedUnixMs": 2020, + "providerIndicatedTimeUnixMs": 6, + }, +} +`; diff --git a/packages/sources/deutsche-boerse/test/integration/adapter.test.ts b/packages/sources/deutsche-boerse/test/integration/adapter.test.ts index 7a49ff59ba..d87c9eae2f 100644 --- a/packages/sources/deutsche-boerse/test/integration/adapter.test.ts +++ b/packages/sources/deutsche-boerse/test/integration/adapter.test.ts @@ -46,7 +46,7 @@ describe('websocket', () => { }) describe('lwba endpoint', () => { - it('returns success and exposes quote/trade fields', async () => { + it('returns success and exposes bid/ask/mid fields', async () => { const response = await testAdapter.request(dataLwba) const body = response.json() @@ -57,16 +57,32 @@ describe('websocket', () => { expect(d).toHaveProperty('bid') expect(d).toHaveProperty('ask') expect(d).toHaveProperty('mid') + expect(d).toHaveProperty('bidSize') + expect(d).toHaveProperty('askSize') + expect(typeof d.bid).toBe('number') + expect(typeof d.ask).toBe('number') + expect(typeof d.mid).toBe('number') + expect(typeof d.bidSize).toBe('number') + expect(typeof d.askSize).toBe('number') + expect(body).toMatchSnapshot() + }) + }) + + describe('price endpoint', () => { + it('returns success and exposes latestPrice', async () => { + const dataPrice = { + ...dataLwba, + endpoint: 'price', + } + const response = await testAdapter.request(dataPrice) + const body = response.json() + + expect(response.statusCode).toBe(200) + expect(body.statusCode).toBe(200) + expect(body).toHaveProperty('data') + const d = body.data expect(d).toHaveProperty('latestPrice') - expect(d).toHaveProperty('quoteProviderIndicatedTimeUnixMs') - expect(d).toHaveProperty('tradeProviderIndicatedTimeUnixMs') - const numOrNull = (v: unknown) => v === null || typeof v === 'number' - expect(numOrNull(d.bid)).toBe(true) - expect(numOrNull(d.ask)).toBe(true) - expect(numOrNull(d.mid)).toBe(true) - expect(numOrNull(d.latestPrice)).toBe(true) - expect(numOrNull(d.quoteProviderIndicatedTimeUnixMs)).toBe(true) - expect(numOrNull(d.tradeProviderIndicatedTimeUnixMs)).toBe(true) + expect(typeof d.latestPrice).toBe('number') expect(body).toMatchSnapshot() }) }) diff --git a/packages/sources/deutsche-boerse/test/integration/fixtures.ts b/packages/sources/deutsche-boerse/test/integration/fixtures.ts index b84a4e2ad5..b3726320e0 100644 --- a/packages/sources/deutsche-boerse/test/integration/fixtures.ts +++ b/packages/sources/deutsche-boerse/test/integration/fixtures.ts @@ -35,8 +35,8 @@ const ack = (requestId: bigint, seq: bigint): Uint8Array => const quoteFrame = (seq: bigint): Uint8Array => { const dat = create(DataSchema, { - Bid: { Px: dec(10000n, -2) }, // 100.00 - Offer: { Px: dec(10100n, -2) }, // 101.00 + Bid: { Px: dec(10000n, -2), Sz: dec(2000n, 0) }, // 100.00, size 2000 + Offer: { Px: dec(10100n, -2), Sz: dec(1000n, 0) }, // 101.00, size 1000 Tm: 5_000_000n, // 5 ms } as any) const md = create(MarketDataSchema, { Instrmt: { Sym: TEST_ISIN }, Dat: dat } as any) diff --git a/packages/sources/deutsche-boerse/test/unit/instrument-quote-cache.test.ts b/packages/sources/deutsche-boerse/test/unit/instrument-quote-cache.test.ts index 4c76e03fb8..0d4b78880e 100644 --- a/packages/sources/deutsche-boerse/test/unit/instrument-quote-cache.test.ts +++ b/packages/sources/deutsche-boerse/test/unit/instrument-quote-cache.test.ts @@ -1,128 +1,212 @@ import { InstrumentQuoteCache } from '../../src/transport/instrument-quote-cache' describe('InstrumentQuoteCache', () => { + const MARKET = 'md-tradegate' + const MARKET2 = 'md-xetraetfetp' const ISIN = 'IE00B53L3W79' const ISIN2 = 'US0000000001' test('activate/deactivate/has/isEmpty/get', () => { const cache = new InstrumentQuoteCache() expect(cache.isEmpty()).toBe(true) - cache.activate(ISIN) - expect(cache.has(ISIN)).toBe(true) - expect(cache.get(ISIN)).toEqual({}) + cache.activate(MARKET, ISIN) + expect(cache.has(MARKET, ISIN)).toBe(true) + expect(cache.get(MARKET, ISIN)).toEqual({}) expect(cache.isEmpty()).toBe(false) - cache.deactivate(ISIN) - expect(cache.has(ISIN)).toBe(false) + cache.deactivate(MARKET, ISIN) + expect(cache.has(MARKET, ISIN)).toBe(false) expect(cache.isEmpty()).toBe(true) }) test('addQuote sets bid/ask/mid and quote time', () => { const cache = new InstrumentQuoteCache() - cache.activate(ISIN) + cache.activate(MARKET, ISIN) - cache.addQuote(ISIN, 100, 102, 1234) - const q = cache.get(ISIN)! + cache.addQuote(MARKET, ISIN, 100, 102, 1234, 1000, 2000) + const q = cache.get(MARKET, ISIN)! expect(q.bid).toBe(100) expect(q.ask).toBe(102) expect(q.mid).toBe(101) + expect(q.bidSize).toBe(1000) + expect(q.askSize).toBe(2000) expect(q.quoteProviderTimeUnixMs).toBe(1234) }) test('addBid then addAsk recomputes mid and updates quote time', () => { const cache = new InstrumentQuoteCache() - cache.activate(ISIN) + cache.activate(MARKET, ISIN) - cache.addBid(ISIN, 100, 1111) // only bid - let q = cache.get(ISIN)! + cache.addBid(MARKET, ISIN, 100, 1111, 500) // only bid + let q = cache.get(MARKET, ISIN)! expect(q.bid).toBe(100) expect(q.ask).toBeUndefined() expect(q.mid).toBeUndefined() + expect(q.bidSize).toBe(500) + expect(q.askSize).toBeUndefined() expect(q.quoteProviderTimeUnixMs).toBe(1111) - cache.addAsk(ISIN, 102, 2222) // now ask arrives - q = cache.get(ISIN)! + cache.addAsk(MARKET, ISIN, 102, 2222, 750) // now ask arrives + q = cache.get(MARKET, ISIN)! expect(q.ask).toBe(102) expect(q.mid).toBe(101) + expect(q.askSize).toBe(750) expect(q.quoteProviderTimeUnixMs).toBe(2222) }) test('addAsk then addBid recomputes mid and updates quote time', () => { const cache = new InstrumentQuoteCache() - cache.activate(ISIN) + cache.activate(MARKET, ISIN) - cache.addAsk(ISIN, 50, 3333) - let q = cache.get(ISIN)! + cache.addAsk(MARKET, ISIN, 50, 3333, 300) + let q = cache.get(MARKET, ISIN)! expect(q.ask).toBe(50) expect(q.mid).toBeUndefined() + expect(q.askSize).toBe(300) - cache.addBid(ISIN, 48, 4444) - q = cache.get(ISIN)! + cache.addBid(MARKET, ISIN, 48, 4444, 400) + q = cache.get(MARKET, ISIN)! expect(q.bid).toBe(48) expect(q.mid).toBe(49) + expect(q.bidSize).toBe(400) expect(q.quoteProviderTimeUnixMs).toBe(4444) }) test('addTrade sets latestPrice and trade time', () => { const cache = new InstrumentQuoteCache() - cache.activate(ISIN) + cache.activate(MARKET, ISIN) - cache.addTrade(ISIN, 99.5, 2222) - const q = cache.get(ISIN)! + cache.addTrade(MARKET, ISIN, 99.5, 2222) + const q = cache.get(MARKET, ISIN)! expect(q.latestPrice).toBe(99.5) expect(q.tradeProviderTimeUnixMs).toBe(2222) }) test('addQuote/addBid/addAsk/addTrade without activate throws', () => { const cache = new InstrumentQuoteCache() - expect(() => cache.addQuote(ISIN, 100, 102, 1234)).toThrow(/inactive isin/i) - expect(() => cache.addBid(ISIN, 100, 1)).toThrow(/inactive isin/i) - expect(() => cache.addAsk(ISIN, 100, 1)).toThrow(/inactive isin/i) - expect(() => cache.addTrade(ISIN, 99.5, 2222)).toThrow(/inactive isin/i) + expect(() => cache.addQuote(MARKET, ISIN, 100, 102, 1234, 500, 600)).toThrow( + /inactive instrument/i, + ) + expect(() => cache.addBid(MARKET, ISIN, 100, 1)).toThrow(/inactive isin/i) + expect(() => cache.addAsk(MARKET, ISIN, 100, 1)).toThrow(/inactive isin/i) + expect(() => cache.addTrade(MARKET, ISIN, 99.5, 2222)).toThrow(/inactive instrument/i) }) test('deactivate then attempt to add -> throws', () => { const cache = new InstrumentQuoteCache() - cache.activate(ISIN) - cache.deactivate(ISIN) - expect(() => cache.addQuote(ISIN, 1, 2, 3)).toThrow(/inactive isin/i) - expect(() => cache.addTrade(ISIN, 1, 3)).toThrow(/inactive isin/i) + cache.activate(MARKET, ISIN) + cache.deactivate(MARKET, ISIN) + expect(() => cache.addQuote(MARKET, ISIN, 1, 2, 3, 4, 5)).toThrow(/inactive instrument/i) + expect(() => cache.addTrade(MARKET, ISIN, 1, 3)).toThrow(/inactive instrument/i) }) test('mid is computed correctly for equal sides and edge values', () => { const cache = new InstrumentQuoteCache() - cache.activate(ISIN) - cache.addQuote(ISIN, 0, 0, 123) - const q = cache.get(ISIN)! + cache.activate(MARKET, ISIN) + cache.addQuote(MARKET, ISIN, 0, 0, 123, 100, 200) + const q = cache.get(MARKET, ISIN)! expect(q.bid).toBe(0) expect(q.ask).toBe(0) expect(q.mid).toBe(0) + expect(q.bidSize).toBe(100) + expect(q.askSize).toBe(200) expect(q.quoteProviderTimeUnixMs).toBe(123) }) test('multiple instruments lifecycle', () => { const cache = new InstrumentQuoteCache() - cache.activate(ISIN) - cache.activate(ISIN2) - expect(cache.has(ISIN)).toBe(true) - expect(cache.has(ISIN2)).toBe(true) + cache.activate(MARKET, ISIN) + cache.activate(MARKET2, ISIN2) + expect(cache.has(MARKET, ISIN)).toBe(true) + expect(cache.has(MARKET2, ISIN2)).toBe(true) expect(cache.isEmpty()).toBe(false) - cache.addQuote(ISIN, 100, 101, 10) - cache.addTrade(ISIN2, 55, 20) + cache.addQuote(MARKET, ISIN, 100, 101, 10, 1500, 1600) + cache.addTrade(MARKET2, ISIN2, 55, 20) - const q1 = cache.get(ISIN)! - const q2 = cache.get(ISIN2)! + const q1 = cache.get(MARKET, ISIN)! + const q2 = cache.get(MARKET2, ISIN2)! expect(q1.mid).toBe(100.5) + expect(q1.bidSize).toBe(1500) + expect(q1.askSize).toBe(1600) expect(q1.quoteProviderTimeUnixMs).toBe(10) expect(q2.latestPrice).toBe(55) expect(q2.tradeProviderTimeUnixMs).toBe(20) - cache.deactivate(ISIN) - expect(cache.has(ISIN)).toBe(false) + cache.deactivate(MARKET, ISIN) + expect(cache.has(MARKET, ISIN)).toBe(false) expect(cache.isEmpty()).toBe(false) - cache.deactivate(ISIN2) + cache.deactivate(MARKET2, ISIN2) expect(cache.isEmpty()).toBe(true) }) + + test('same ISIN in different markets are stored separately', () => { + const cache = new InstrumentQuoteCache() + cache.activate(MARKET, ISIN) + cache.activate(MARKET2, ISIN) // Same ISIN, different market + + cache.addQuote(MARKET, ISIN, 100, 101, 10, 800, 900) + cache.addTrade(MARKET2, ISIN, 200, 20) + + const q1 = cache.get(MARKET, ISIN)! + const q2 = cache.get(MARKET2, ISIN)! + + expect(q1.mid).toBe(100.5) + expect(q1.bidSize).toBe(800) + expect(q1.askSize).toBe(900) + expect(q1.quoteProviderTimeUnixMs).toBe(10) + expect(q1.latestPrice).toBeUndefined() // No trade data for this market + + expect(q2.latestPrice).toBe(200) + expect(q2.tradeProviderTimeUnixMs).toBe(20) + expect(q2.mid).toBeUndefined() // No quote data for this market + expect(q2.bidSize).toBeUndefined() + expect(q2.askSize).toBeUndefined() + + expect(cache.isEmpty()).toBe(false) + cache.deactivate(MARKET, ISIN) + expect(cache.has(MARKET, ISIN)).toBe(false) + expect(cache.has(MARKET2, ISIN)).toBe(true) // Other market still active + expect(cache.isEmpty()).toBe(false) + + cache.deactivate(MARKET2, ISIN) + expect(cache.isEmpty()).toBe(true) + }) + + test('bidSize and askSize are properly handled for individual bid/ask updates', () => { + const cache = new InstrumentQuoteCache() + cache.activate(MARKET, ISIN) + + // Add bid with size + cache.addBid(MARKET, ISIN, 95, 1000, 1200) + let q = cache.get(MARKET, ISIN)! + expect(q.bid).toBe(95) + expect(q.bidSize).toBe(1200) + expect(q.ask).toBeUndefined() + expect(q.askSize).toBeUndefined() + + // Add ask with size + cache.addAsk(MARKET, ISIN, 105, 2000, 1300) + q = cache.get(MARKET, ISIN)! + expect(q.ask).toBe(105) + expect(q.askSize).toBe(1300) + expect(q.mid).toBe(100) + expect(q.quoteProviderTimeUnixMs).toBe(2000) + + // Update bid without size (should be undefined) + cache.addBid(MARKET, ISIN, 96, 3000) + q = cache.get(MARKET, ISIN)! + expect(q.bid).toBe(96) + expect(q.bidSize).toBeUndefined() + expect(q.mid).toBe(100.5) + + // Update ask without size (should be undefined) + cache.addAsk(MARKET, ISIN, 106, 4000) + q = cache.get(MARKET, ISIN)! + expect(q.ask).toBe(106) + expect(q.askSize).toBeUndefined() + expect(q.mid).toBe(101) + expect(q.quoteProviderTimeUnixMs).toBe(4000) + }) }) diff --git a/packages/sources/deutsche-boerse/test/unit/lwba.test.ts b/packages/sources/deutsche-boerse/test/unit/lwba.test.ts index 538daf208b..9f402f2467 100644 --- a/packages/sources/deutsche-boerse/test/unit/lwba.test.ts +++ b/packages/sources/deutsche-boerse/test/unit/lwba.test.ts @@ -9,7 +9,8 @@ import { type Decimal, type MarketData, } from '../../src/gen/md_cef_pb' -import { createLwbaWsTransport } from '../../src/transport/lwba' // keep your existing path +import { createLwbaWsTransport, lwbaProtobufWsTransport } from '../../src/transport/lwba' +import { priceProtobufWsTransport } from '../../src/transport/price' LoggerFactoryProvider.set() @@ -29,15 +30,25 @@ function makeStreamBuffer(md: MarketData | MarketDataInit): Buffer { return Buffer.from(toBinary(StreamMessageSchema, sm)) } -describe('LWBA websocket transport', () => { +describe('LWBA websocket transport base functionality', () => { + // Test the base transport functionality using a simplified extract function + const mockExtractData = (quote: any) => { + if (quote.latestPrice == null) { + return undefined + } + return { + latestPrice: quote.latestPrice, + } + } + test('message for non-activated instrument returns []', () => { - const t = createLwbaWsTransport() as any + const t = createLwbaWsTransport(mockExtractData) as any const md = create(MarketDataSchema, { Instrmt: { Sym: ISIN }, Dat: create(DataSchema, { - Bid: { Px: dec(10000n, -2) }, - Offer: { Px: dec(10100n, -2) }, - Tm: 1_000_000n, + Bid: { Px: dec(BigInt(10000), -2) }, + Offer: { Px: dec(BigInt(10100), -2) }, + Tm: BigInt(1000000), } as any), } as any) const out = t.config.handlers.message(makeStreamBuffer(md)) @@ -45,7 +56,7 @@ describe('LWBA websocket transport', () => { }) test('subscribe builder: first subscribe returns frame, subsequent subscribes return undefined', () => { - const t = createLwbaWsTransport() as any + const t = createLwbaWsTransport(mockExtractData) as any const first = t.config.builders.subscribeMessage({ market: MARKET, isin: ISIN }) const second = t.config.builders.subscribeMessage({ market: MARKET, isin: OTHER }) expect(first).toBeInstanceOf(Uint8Array) @@ -53,7 +64,7 @@ describe('LWBA websocket transport', () => { }) test('unsubscribe builder: removing last returns frame, otherwise undefined', () => { - const t = createLwbaWsTransport() as any + const t = createLwbaWsTransport(mockExtractData) as any t.config.builders.subscribeMessage({ market: MARKET, isin: ISIN }) t.config.builders.subscribeMessage({ market: MARKET, isin: OTHER }) @@ -65,31 +76,68 @@ describe('LWBA websocket transport', () => { }) test('missing ISIN: handler returns []', () => { - const t = createLwbaWsTransport() as any + const t = createLwbaWsTransport(mockExtractData) as any t.config.builders.subscribeMessage({ market: MARKET, isin: ISIN }) const md = create(MarketDataSchema, { - Dat: create(DataSchema, { Px: dec(100n, 0), Tm: 1_000_000n } as any), + Dat: create(DataSchema, { Px: dec(BigInt(100), 0), Tm: BigInt(1000000) } as any), } as any) const out = t.config.handlers.message(makeStreamBuffer(md)) expect(out).toEqual([]) }) - test('quote then trade: emits only when complete and reflects cached fields and timestamps', () => { - const t = createLwbaWsTransport() as any + test('defensive decoding: bad buffer returns []', () => { + const t = createLwbaWsTransport(mockExtractData) as any + const res = t.config.handlers.message(Buffer.from('not-a-protobuf')) + expect(res).toEqual([]) + }) + + test('open() refreshes TTL immediately and on interval', async () => { + jest.useFakeTimers() + const t = createLwbaWsTransport(mockExtractData) as any + + // stub framework bits + const writeTTL = jest.fn() + t.responseCache = { writeTTL } + t.subscriptionSet = { getAll: jest.fn().mockResolvedValue([]) } + + const ctx = { + adapterSettings: { + WS_API_ENDPOINT: 'wss://example', + API_KEY: 'key', + CACHE_MAX_AGE: 45000, + CACHE_TTL_REFRESH_MS: 60000, + }, + } as any + + await t.config.handlers.open({}, ctx) + expect(writeTTL).toHaveBeenCalledTimes(1) + + // Advance one full interval AND await the async callback + await jest.advanceTimersByTimeAsync(60000) + + expect(writeTTL).toHaveBeenCalledTimes(2) + + jest.useRealTimers() + }) +}) + +describe('LWBA Latest Price Transport', () => { + test('emits only when latestPrice is available', () => { + const t = priceProtobufWsTransport as any t.config.builders.subscribeMessage({ market: MARKET, isin: ISIN }) // Quote (no latestPrice yet) -> should NOT emit const quoteDat = create(DataSchema, { - Bid: { Px: dec(10000n, -2) }, - Offer: { Px: dec(10100n, -2) }, - Tm: 5_000_000n, + Bid: { Px: dec(BigInt(10000), -2), Sz: dec(BigInt(2000), 0) }, + Offer: { Px: dec(BigInt(10100), -2), Sz: dec(BigInt(1000), 0) }, + Tm: BigInt(5000000), } as any) const quoteMd = create(MarketDataSchema, { Instrmt: { Sym: ISIN }, Dat: quoteDat } as any) const quoteRes = t.config.handlers.message(makeStreamBuffer(quoteMd)) expect(quoteRes).toEqual([]) - // Trade (now latestPrice arrives) -> should emit with full set - const tradeDat = create(DataSchema, { Px: dec(9999n, -2), Tm: 6_000_000n } as any) + // Trade (now latestPrice arrives) -> should emit + const tradeDat = create(DataSchema, { Px: dec(BigInt(9999), -2), Tm: BigInt(6000000) } as any) const tradeMd = create(MarketDataSchema, { Instrmt: { Sym: ISIN }, Dat: tradeDat } as any) const tradeRes = t.config.handlers.message(makeStreamBuffer(tradeMd)) @@ -97,74 +145,143 @@ describe('LWBA websocket transport', () => { const [entry] = tradeRes const d = entry.response.data + expect(d.latestPrice).toBe(99.99) + }) + + test('emits when complete data is available from cache', () => { + // This test runs after the previous test which populated the cache with quote data + const t = priceProtobufWsTransport as any + + // Since quote data is already in cache from previous test, adding trade data should trigger emission + const tradeDat = create(DataSchema, { Px: dec(BigInt(9999), -2), Tm: BigInt(6000000) } as any) + const tradeMd = create(MarketDataSchema, { Instrmt: { Sym: ISIN }, Dat: tradeDat } as any) + const tradeRes = t.config.handlers.message(makeStreamBuffer(tradeMd)) + + // Should emit because we now have complete data (quote from previous test + trade from this test) + expect(tradeRes.length).toBe(1) + expect(tradeRes[0].response.data.latestPrice).toBe(99.99) + }) +}) + +describe('LWBA Metadata Transport', () => { + test('emits when complete bid/ask data with sizes is available', () => { + const t = lwbaProtobufWsTransport as any + const FRESH_ISIN = 'DE0005810055' // Use unique ISIN to avoid cache interference + t.config.builders.subscribeMessage({ market: MARKET, isin: FRESH_ISIN }) + + // Complete quote with bid, ask, and sizes -> should emit + const quoteDat = create(DataSchema, { + Bid: { Px: dec(BigInt(10000), -2), Sz: dec(BigInt(2000), 0) }, + Offer: { Px: dec(BigInt(10100), -2), Sz: dec(BigInt(1000), 0) }, + Tm: BigInt(5000000), + } as any) + const quoteMd = create(MarketDataSchema, { Instrmt: { Sym: FRESH_ISIN }, Dat: quoteDat } as any) + const quoteRes = t.config.handlers.message(makeStreamBuffer(quoteMd)) + + expect(quoteRes.length).toBe(1) + const [entry] = quoteRes + const d = entry.response.data + expect(d.bid).toBe(100) expect(d.ask).toBe(101) expect(d.mid).toBe(100.5) - expect(d.latestPrice).toBe(99.99) - expect(d.quoteProviderIndicatedTimeUnixMs).toBe(5) - expect(d.tradeProviderIndicatedTimeUnixMs).toBe(6) - expect(entry.response.timestamps.providerIndicatedTimeUnixMs).toBe(6) + expect(d.bidSize).toBe(2000) + expect(d.askSize).toBe(1000) }) - test('bid-only then ask-only then trade → emits once both quote & trade are known', () => { - const t = createLwbaWsTransport() as any + test('bid-only then ask-only then trade → emits when complete', () => { + const t = lwbaProtobufWsTransport as any t.config.builders.subscribeMessage({ market: MARKET, isin: ISIN }) - // bid-only + // bid-only -> might emit if there's already trade data in cache from previous tests const bidOnly = create(MarketDataSchema, { Instrmt: { Sym: ISIN }, - Dat: create(DataSchema, { Bid: { Px: dec(10000n, -2) }, Tm: 10_000_000n } as any), + Dat: create(DataSchema, { + Bid: { Px: dec(BigInt(10000), -2), Sz: dec(BigInt(2000), 0) }, + Tm: BigInt(10000000), + } as any), } as any) - expect(t.config.handlers.message(makeStreamBuffer(bidOnly))).toEqual([]) + const bidResult = t.config.handlers.message(makeStreamBuffer(bidOnly)) + // The result depends on whether there's already trade data in the cache + if (bidResult.length > 0) { + // If it emits, verify the data is reasonable (bid + cached data) + expect(bidResult[0].response.data.bid).toBe(100) + } - // ask-only + // ask-only -> add ask data to cache const askOnly = create(MarketDataSchema, { Instrmt: { Sym: ISIN }, - Dat: create(DataSchema, { Offer: { Px: dec(10200n, -2) }, Tm: 11_000_000n } as any), + Dat: create(DataSchema, { + Offer: { Px: dec(BigInt(10200), -2), Sz: dec(BigInt(750), 0) }, + Tm: BigInt(11000000), + } as any), } as any) - expect(t.config.handlers.message(makeStreamBuffer(askOnly))).toEqual([]) + t.config.handlers.message(makeStreamBuffer(askOnly)) - // trade → emit + // trade → should definitely emit now that we have complete fresh data const trade = create(MarketDataSchema, { Instrmt: { Sym: ISIN }, - Dat: create(DataSchema, { Px: dec(10100n, -2), Tm: 12_000_000n } as any), + Dat: create(DataSchema, { + Px: dec(BigInt(10100), -2), + Sz: dec(BigInt(500), 0), + Tm: BigInt(12000000), + } as any), } as any) - const [entry] = t.config.handlers.message(makeStreamBuffer(trade)) + const result = t.config.handlers.message(makeStreamBuffer(trade)) + expect(result.length).toBe(1) + + const [entry] = result expect(entry.response.data.bid).toBe(100) expect(entry.response.data.ask).toBe(102) expect(entry.response.data.mid).toBe(101) - expect(entry.response.data.latestPrice).toBe(101) - expect(entry.response.data.quoteProviderIndicatedTimeUnixMs).toBe(11) - expect(entry.response.data.tradeProviderIndicatedTimeUnixMs).toBe(12) }) - test('defensive decoding: bad buffer returns []', () => { - const t = createLwbaWsTransport() as any - const res = t.config.handlers.message(Buffer.from('not-a-protobuf')) - expect(res).toEqual([]) - }) + test('protobuf with bid/ask sizes are handled correctly', () => { + const t = lwbaProtobufWsTransport as any + t.config.builders.subscribeMessage({ market: MARKET, isin: OTHER }) // Use different ISIN to avoid cache interference - test('open() refreshes TTL immediately and on interval', async () => { - jest.useFakeTimers() // modern timers in your Jest config - const t = createLwbaWsTransport() as any + // Quote with sizes -> should emit immediately as all required data is present + const quoteDat = create(DataSchema, { + Bid: { Px: dec(BigInt(9500), -2), Sz: dec(BigInt(1500), 0) }, + Offer: { Px: dec(BigInt(9600), -2), Sz: dec(BigInt(1600), 0) }, + Tm: BigInt(7000000), + } as any) + const quoteMd = create(MarketDataSchema, { Instrmt: { Sym: OTHER }, Dat: quoteDat } as any) + const quoteRes = t.config.handlers.message(makeStreamBuffer(quoteMd)) - // stub framework bits - const writeTTL = jest.fn() - t.responseCache = { writeTTL } - t.subscriptionSet = { getAll: jest.fn().mockResolvedValue([]) } + expect(quoteRes.length).toBe(1) + const [entry] = quoteRes + const d = entry.response.data - const ctx = { - adapterSettings: { WS_API_ENDPOINT: 'wss://example', API_KEY: 'key', CACHE_MAX_AGE: 45_000 }, - } as any + expect(d.bid).toBe(95) + expect(d.ask).toBe(96) + expect(d.mid).toBe(95.5) + expect(d.bidSize).toBe(1500) + expect(d.askSize).toBe(1600) + }) - await t.config.handlers.open({}, ctx) - expect(writeTTL).toHaveBeenCalledTimes(1) + test('protobuf with zero bid/ask sizes emits successfully', () => { + const t = lwbaProtobufWsTransport as any + const TEST_ISIN = 'TEST123456789' // Use unique ISIN to avoid cache interference + t.config.builders.subscribeMessage({ market: MARKET, isin: TEST_ISIN }) - // Advance one full interval AND await the async callback - await jest.advanceTimersByTimeAsync(60_000) + // Quote with zero sizes -> lwba transport treats 0 as valid, so it should emit + const quoteDat = create(DataSchema, { + Bid: { Px: dec(BigInt(8500), -2), Sz: dec(BigInt(0), 0) }, + Offer: { Px: dec(BigInt(8600), -2), Sz: dec(BigInt(0), 0) }, + Tm: BigInt(9000000), + } as any) + const quoteMd = create(MarketDataSchema, { Instrmt: { Sym: TEST_ISIN }, Dat: quoteDat } as any) + const quoteRes = t.config.handlers.message(makeStreamBuffer(quoteMd)) - expect(writeTTL).toHaveBeenCalledTimes(2) + expect(quoteRes.length).toBe(1) + const [entry] = quoteRes + const d = entry.response.data - jest.useRealTimers() + expect(d.bid).toBe(85) + expect(d.ask).toBe(86) + expect(d.mid).toBe(85.5) + expect(d.bidSize).toBe(0) + expect(d.askSize).toBe(0) }) })