Skip to content

Commit d841c79

Browse files
committed
fix: take a better approach when identifying subscription responses - use correlation
1 parent e5565f4 commit d841c79

File tree

2 files changed

+39
-11
lines changed

2 files changed

+39
-11
lines changed

src/connection/listeners.ts

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { OpenfeedGatewayMessage, Result, SubscriptionType } from "@gen/openfeed_api";
1+
import { OpenfeedGatewayMessage, Result } from "@gen/openfeed_api";
22
import { InstrumentDefinition } from "@gen/openfeed_instrument";
33
import Long from "long";
44
import { ActionType, HeartBeat } from "@gen/openfeed";
@@ -11,9 +11,9 @@ const IDGetters: ((msg: OpenfeedGatewayMessage) => Long | undefined)[] = [
1111
(msg) => msg.volumeAtPrice?.marketId,
1212
];
1313
export class OpenFeedListeners {
14-
private readonly instrumentByMarketId: Map<string, [InstrumentDefinition?, [string, SubscriptionType][]?]> = new Map<
14+
private readonly instrumentByMarketId: Map<string, [InstrumentDefinition?, [string, string][]?]> = new Map<
1515
string,
16-
[InstrumentDefinition?, [string, SubscriptionType][]?]
16+
[InstrumentDefinition?, [string, string][]?]
1717
>();
1818

1919
constructor() {
@@ -22,39 +22,44 @@ export class OpenFeedListeners {
2222

2323
private addDetails = (message: OpenfeedGatewayMessage) => {
2424
let def: InstrumentDefinition | undefined;
25-
let symbols: [string, SubscriptionType][] | undefined;
25+
let symbols: [string, string][] | undefined;
2626

2727
const getInstrumentDefinition = (marketId: Long) => {
2828
const res = this.instrumentByMarketId.get(marketId.toString());
2929

3030
return res ?? [undefined, undefined];
3131
};
3232

33-
const includesSymbolSubscription = (arr: [string, SubscriptionType][], item: [string, SubscriptionType]) => {
34-
return arr.some(([symbol, type]) => symbol === item[0] && type === item[1]);
33+
const includesSymbolSubscription = (arr: [string, string][], item: [string, string]) => {
34+
return arr.some(([symbol, correlationId]) => symbol === item[0] && correlationId === item[1]);
3535
};
3636

3737
if (message.subscriptionResponse) {
38-
const { marketId, symbol, unsubscribe, status, subscriptionType } = message.subscriptionResponse;
38+
const { marketId, symbol, unsubscribe, status, correlationId } = message.subscriptionResponse;
39+
const corIdStr = correlationId.toString();
3940
if (marketId !== Long.ZERO) {
4041
[def, symbols] = getInstrumentDefinition(marketId);
4142
if (status?.result === Result.SUCCESS) {
42-
const currentEntry: [string, SubscriptionType] = [symbol, subscriptionType];
43+
const currentEntry: [string, string] = [symbol, corIdStr];
44+
4345
if (!unsubscribe) {
4446
if (!symbols) {
4547
symbols = [currentEntry];
4648
} else if (!includesSymbolSubscription(symbols, currentEntry)) {
4749
symbols = [...symbols, currentEntry];
4850
}
51+
this.instrumentByMarketId.set(marketId.toString(), [def, symbols]);
4952
} else {
53+
let symbolsToSave: [string, string][] | undefined;
5054
if (symbols) {
51-
symbols = symbols.filter(([s, t]) => !(s === symbol && t === subscriptionType));
55+
symbolsToSave = symbols.filter(([s, t]) => !(s === symbol && t === corIdStr));
5256
}
53-
if (!symbols) {
57+
if (!symbolsToSave?.length) {
5458
this.instrumentByMarketId.delete(marketId.toString());
59+
} else {
60+
this.instrumentByMarketId.set(marketId.toString(), [def, symbolsToSave]);
5561
}
5662
}
57-
this.instrumentByMarketId.set(marketId.toString(), [def, symbols]);
5863
}
5964
}
6065
} else if (message.instrumentDefinition) {

test/listeners.test.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ const ALIAS_REGULAR_NEW = "SYM*5";
1010
const MESSAGES = {
1111
SUBSCRIPTION_RESPONSE_MESSAGE_OLD: {
1212
subscriptionResponse: {
13+
correlationId: Long.fromNumber(1),
1314
marketId: MARKET_ID,
1415
symbol: ALIAS_REGULAR_OLD,
1516
unsubscribe: false,
@@ -21,6 +22,7 @@ const MESSAGES = {
2122
} as Partial<OpenfeedGatewayMessage> as OpenfeedGatewayMessage,
2223
SUBSCRIPTION_RESPONSE_DEPTH_OLD: {
2324
subscriptionResponse: {
25+
correlationId: Long.fromNumber(2),
2426
marketId: MARKET_ID,
2527
symbol: ALIAS_REGULAR_OLD,
2628
unsubscribe: false,
@@ -51,6 +53,7 @@ const MESSAGES = {
5153
} as Partial<OpenfeedGatewayMessage> as OpenfeedGatewayMessage,
5254
SUBSCRIPTION_RESPONSE_MESSAGE_NEW: {
5355
subscriptionResponse: {
56+
correlationId: Long.fromNumber(3),
5457
marketId: MARKET_ID,
5558
symbol: ALIAS_REGULAR_NEW,
5659
unsubscribe: false,
@@ -83,6 +86,7 @@ const MESSAGES = {
8386
} as Partial<OpenfeedGatewayMessage> as OpenfeedGatewayMessage,
8487
DUMMY_SUBSCRIPTION_UNAUTHORIZED: {
8588
subscriptionResponse: {
89+
correlationId: Long.fromNumber(1),
8690
marketId: MARKET_ID,
8791
symbol: ALIAS_REGULAR_OLD,
8892
unsubscribe: false,
@@ -94,6 +98,7 @@ const MESSAGES = {
9498
} as Partial<OpenfeedGatewayMessage> as OpenfeedGatewayMessage,
9599
DUMMY_UNSUBSCRIPTION_UNSUBSCRIBED: {
96100
subscriptionResponse: {
101+
correlationId: Long.fromNumber(1),
97102
marketId: MARKET_ID,
98103
symbol: ALIAS_REGULAR_OLD,
99104
unsubscribe: true,
@@ -105,6 +110,7 @@ const MESSAGES = {
105110
} as Partial<OpenfeedGatewayMessage> as OpenfeedGatewayMessage,
106111
DUMMY_UNSUBSCRIPTION_DEPTH: {
107112
subscriptionResponse: {
113+
correlationId: Long.fromNumber(2),
108114
marketId: MARKET_ID,
109115
symbol: ALIAS_REGULAR_OLD,
110116
unsubscribe: true,
@@ -236,6 +242,23 @@ describe("OpenFeedListeners", () => {
236242
);
237243
});
238244

245+
it("should handle unsubscription messages correctly", () => {
246+
// We require the subscriptionResponse and instrumentDefinition message to arrive first
247+
listeners.onMessage(MESSAGES.SUBSCRIPTION_RESPONSE_DEPTH_OLD);
248+
listeners.onMessage(MESSAGES.INSTRUMENT_DEFINITION_MESSAGE_OLD);
249+
listeners.onMessage(MESSAGES.DUMMY_UNSUBSCRIPTION_DEPTH);
250+
listeners.onMessage(MESSAGES.DUMMY_SNAPSHOT_MESSAGE);
251+
252+
expect(onMessageWithMetadataSpy).toHaveBeenCalledWith(MESSAGES.SUBSCRIPTION_RESPONSE_DEPTH_OLD, [ALIAS_REGULAR_OLD], undefined);
253+
expect(onMessageWithMetadataSpy).toHaveBeenCalledWith(MESSAGES.INSTRUMENT_DEFINITION_MESSAGE_OLD, [ALIAS_REGULAR_OLD], undefined);
254+
expect(onMessageWithMetadataSpy).toHaveBeenCalledWith(
255+
MESSAGES.DUMMY_UNSUBSCRIPTION_DEPTH,
256+
[ALIAS_REGULAR_OLD],
257+
MESSAGES.INSTRUMENT_DEFINITION_MESSAGE_OLD.instrumentDefinition
258+
);
259+
expect(onMessageWithMetadataSpy).toHaveBeenCalledWith(MESSAGES.DUMMY_SNAPSHOT_MESSAGE, [], undefined);
260+
});
261+
239262
it("should handle alias_changed instrument action correctly", () => {
240263
// We expect the messages to arrive in the following order - subscriptionResponse, instrumentDefinition, marketSnapshot
241264
// after an alias change - we expect the subscriptionResponse and instrumentDefinition to be repeated so that we can reconnect everything

0 commit comments

Comments
 (0)