Skip to content

Commit 0b5b440

Browse files
committed
Add exp backoff for fetching trades
1 parent 6d17c58 commit 0b5b440

File tree

1 file changed

+32
-19
lines changed

1 file changed

+32
-19
lines changed

src/data-feeds/ccxt-provider-service.ts

Lines changed: 32 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ import ccxt, { Exchange, Trade } from "ccxt";
33
import { readFileSync } from "fs";
44
import { FeedId, FeedValueData, FeedVolumeData, Volume } from "../dto/provider-requests.dto";
55
import { BaseDataFeed } from "./base-feed";
6-
import { retry, sleepFor } from "src/utils/retry";
6+
import { retry, RetryError, sleepFor } from "src/utils/retry";
77
import { VolumeStore } from "./volumes";
88
import { asError } from "../utils/error";
99

@@ -184,30 +184,43 @@ export class CcxtFeed implements BaseDataFeed {
184184
marketIds.forEach(marketId => void this.watchTradesForSymbol(exchange, marketId));
185185
} else {
186186
this.logger.warn(`Exchange ${exchange.id} does not support watching trades, polling for trades instead`);
187-
this.fetchTrades(exchange, marketIds, exchangeName);
187+
void this.fetchTrades(exchange, marketIds, exchangeName);
188188
}
189189
}
190190

191-
private fetchTrades(exchange: Exchange, marketIds: string[], exchangeName: string) {
192-
setInterval(async () => {
191+
private async fetchTrades(exchange: Exchange, marketIds: string[], exchangeName: string) {
192+
while (true) {
193193
try {
194-
for (const marketId of marketIds) {
195-
const trades = await exchange.fetchTrades(marketId);
196-
if (trades.length > 0) {
197-
trades.sort((a, b) => b.timestamp - a.timestamp);
198-
const latestTrade = trades[0];
199-
if (latestTrade.timestamp > (this.latestPrice.get(latestTrade.symbol)?.get(exchange.id)?.time || 0)) {
200-
this.setPrice(exchange.id, latestTrade.symbol, latestTrade.price, latestTrade.timestamp);
194+
await retry(
195+
async () => {
196+
for (const marketId of marketIds) {
197+
const trades = await exchange.fetchTrades(marketId);
198+
if (trades.length > 0) {
199+
trades.sort((a, b) => b.timestamp - a.timestamp);
200+
const latestTrade = trades[0];
201+
if (latestTrade.timestamp > (this.latestPrice.get(latestTrade.symbol)?.get(exchange.id)?.time || 0)) {
202+
this.setPrice(exchange.id, latestTrade.symbol, latestTrade.price, latestTrade.timestamp);
203+
}
204+
} else {
205+
this.logger.warn(`No trades found for ${marketId} on ${exchangeName}`);
206+
}
201207
}
202-
} else {
203-
this.logger.warn(`No trades found for ${marketId} on ${exchangeName}`);
204-
}
205-
}
206-
} catch (error) {
207-
const err = asError(error);
208-
this.logger.error(`Error fetching trades for ${exchangeName}/${marketIds}: ${err.message}, will retry.`);
208+
},
209+
5,
210+
2000,
211+
this.logger
212+
);
213+
await sleepFor(1_000); // Wait 1 second before the next fetch
214+
} catch (e) {
215+
const error = asError(e);
216+
if (error instanceof RetryError) {
217+
this.logger.debug(
218+
`Failed to fetch trades after multiple retries for ${exchange.id}/${marketIds}: ${error.cause}, will attempt again in 5 minutes`
219+
);
220+
await sleepFor(300_000); // Wait 5 minutes, we must be rate-limited
221+
} else throw error;
209222
}
210-
}, 1000);
223+
}
211224
}
212225

213226
private async watchTradesForSymbols(exchange: Exchange, marketIds: string[]) {

0 commit comments

Comments
 (0)