Skip to content

Commit f315078

Browse files
committed
Auto retry when a feed is rated limited based on host
1 parent e4c0030 commit f315078

File tree

5 files changed

+165
-55
lines changed

5 files changed

+165
-55
lines changed

services/feed-requests/src/feed-fetcher/feed-fetcher-listener.service.ts

Lines changed: 79 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ import { RequestSource } from './constants/request-source.constants';
1212
import PartitionedRequestsStoreService from '../partitioned-requests-store/partitioned-requests-store.service';
1313
import { PartitionedRequestInsert } from '../partitioned-requests-store/types/partitioned-request.type';
1414
import { HostRateLimiterService } from '../host-rate-limiter/host-rate-limiter.service';
15+
import retryUntilTrue, {
16+
RetryException,
17+
} from '../shared/utils/retry-until-true';
1518

1619
interface BatchRequestMessage {
1720
timestamp: number;
@@ -65,81 +68,103 @@ export class FeedFetcherListenerService {
6568

6669
@UseRequestContext()
6770
private async onBrokerFetchRequestBatchHandler(
68-
message: BatchRequestMessage,
71+
batchRequest: BatchRequestMessage,
6972
): Promise<void> {
70-
const urls = message?.data?.map((u) => u.url);
71-
const rateSeconds = message?.rateSeconds;
73+
const urls = batchRequest?.data?.map((u) => u.url);
74+
const rateSeconds = batchRequest?.rateSeconds;
7275

73-
if (!message.data || rateSeconds == null) {
76+
if (!batchRequest.data || rateSeconds == null) {
7477
logger.error(
7578
`Received fetch batch request message has no urls and/or rateSeconds, skipping`,
7679
{
77-
event: message,
80+
event: batchRequest,
7881
},
7982
);
8083

8184
return;
8285
}
8386

8487
logger.debug(`Fetch batch request message received for batch urls`, {
85-
event: message,
88+
event: batchRequest,
8689
});
8790

8891
try {
8992
const results = await Promise.allSettled(
90-
message.data.map(
91-
async ({ url, lookupKey, saveToObjectStorage, headers }) => {
92-
let request: PartitionedRequestInsert | undefined = undefined;
93-
94-
try {
95-
const { isRateLimited } =
96-
await this.hostRateLimiterService.incrementUrlCount(url);
97-
98-
if (isRateLimited) {
99-
logger.debug(`Host ${url} is rate limited, skipping`);
100-
101-
return;
102-
}
93+
batchRequest.data.map(async (message) => {
94+
const { url, lookupKey, saveToObjectStorage, headers } = message;
95+
let request: PartitionedRequestInsert | undefined = undefined;
96+
97+
try {
98+
await retryUntilTrue(
99+
async () => {
100+
const { isRateLimited } =
101+
await this.hostRateLimiterService.incrementUrlCount(url);
102+
103+
if (isRateLimited) {
104+
logger.debug(
105+
`Host ${url} is still rate limited, retrying later`,
106+
);
107+
}
108+
109+
return !isRateLimited;
110+
},
111+
5000,
112+
(rateSeconds * 1000) / 1.5, // 1.5 is the backoff factor of retryUntilTrue
113+
);
114+
115+
const result = await this.handleBrokerFetchRequest({
116+
lookupKey,
117+
url,
118+
rateSeconds,
119+
saveToObjectStorage,
120+
headers,
121+
});
122+
123+
if (result) {
124+
request = result.request;
125+
}
103126

104-
const result = await this.handleBrokerFetchRequest({
127+
if (result.successful) {
128+
await this.emitFetchCompleted({
105129
lookupKey,
106130
url,
107-
rateSeconds,
108-
saveToObjectStorage,
109-
headers,
131+
rateSeconds: rateSeconds,
110132
});
111-
112-
if (result) {
113-
request = result.request;
114-
}
115-
116-
if (result.successful) {
117-
await this.emitFetchCompleted({
118-
lookupKey,
133+
}
134+
} catch (err) {
135+
if (err instanceof RetryException) {
136+
logger.error(
137+
`Error while retrying due to host rate limits: ${err.message}`,
138+
{
139+
event: message,
140+
err: (err as Error).stack,
141+
},
142+
);
143+
} else {
144+
logger.error(`Error processing fetch request message`, {
145+
event: message,
146+
err: (err as Error).stack,
147+
});
148+
}
149+
} finally {
150+
if (batchRequest.timestamp) {
151+
const nowTs = Date.now();
152+
const finishedTs = nowTs - batchRequest.timestamp;
153+
154+
logger.datadog(
155+
`Finished handling feed requests batch event URL in ${finishedTs}s`,
156+
{
157+
duration: finishedTs,
119158
url,
120-
rateSeconds: rateSeconds,
121-
});
122-
}
123-
} finally {
124-
if (message.timestamp) {
125-
const nowTs = Date.now();
126-
const finishedTs = nowTs - message.timestamp;
127-
128-
logger.datadog(
129-
`Finished handling feed requests batch event URL in ${finishedTs}s`,
130-
{
131-
duration: finishedTs,
132-
url,
133-
lookupKey,
134-
requestStatus: request?.status,
135-
statusCode: request?.response?.statusCode,
136-
errorMessage: request?.errorMessage,
137-
},
138-
);
139-
}
159+
lookupKey,
160+
requestStatus: request?.status,
161+
statusCode: request?.response?.statusCode,
162+
errorMessage: request?.errorMessage,
163+
},
164+
);
140165
}
141-
},
142-
),
166+
}
167+
}),
143168
);
144169

145170
for (let i = 0; i < results.length; ++i) {
@@ -163,7 +188,7 @@ export class FeedFetcherListenerService {
163188
});
164189
} catch (err) {
165190
logger.error(`Error processing fetch batch request message`, {
166-
event: message,
191+
event: batchRequest,
167192
err: (err as Error).stack,
168193
});
169194
}

services/feed-requests/src/host-rate-limiter/host-rate-limiter.service.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@ const RATE_LIMITED_HOSTS = new Map<string, RateLimitData>([
1515
intervalSec: 2,
1616
},
1717
],
18+
[
19+
'nasdaq.com',
20+
{
21+
requestLimit: 1,
22+
intervalSec: 30,
23+
},
24+
],
1825
]);
1926

2027
@Injectable()

services/feed-requests/src/partitioned-requests-store/partitioned-requests-store.service.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@ import { MikroORM } from '@mikro-orm/core';
55
import { RequestSource } from '../feed-fetcher/constants/request-source.constants';
66
import { Request } from '../feed-fetcher/entities';
77
import { RequestStatus } from '../feed-fetcher/constants';
8+
import { getUrlHost } from '../utils/get-url-host';
9+
import logger from '../utils/logger';
810

911
const sha1 = createHash('sha1');
1012

@@ -32,6 +34,17 @@ export default class PartitionedRequestsStoreService {
3234
await Promise.all(
3335
this.pendingInserts.map((responseInsert) => {
3436
const urlHash = sha1.copy().update(responseInsert.url).digest('hex');
37+
let hostHash: string | null = null;
38+
39+
try {
40+
const host = getUrlHost(responseInsert.url);
41+
hostHash = sha1.copy().update(host).digest('hex');
42+
} catch (err) {
43+
logger.error('Failed to get host from url', {
44+
url: responseInsert.url,
45+
err: (err as Error).stack,
46+
});
47+
}
3548

3649
return em.execute(
3750
`INSERT INTO request_partitioned (
@@ -41,6 +54,7 @@ export default class PartitionedRequestsStoreService {
4154
fetch_options,
4255
url,
4356
url_hash,
57+
host_hash,
4458
lookup_key,
4559
created_at,
4660
next_retry_date,
@@ -51,7 +65,7 @@ export default class PartitionedRequestsStoreService {
5165
response_redis_cache_key,
5266
response_headers
5367
) VALUES (
54-
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
68+
?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
5569
)`,
5670
[
5771
randomUUID(),
@@ -60,6 +74,7 @@ export default class PartitionedRequestsStoreService {
6074
this.stringifyJson(responseInsert.fetchOptions),
6175
responseInsert.url,
6276
urlHash,
77+
hostHash,
6378
responseInsert.lookupKey,
6479
responseInsert.createdAt,
6580
responseInsert.nextRetryDate,
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
export class RetryException extends Error {}
2+
3+
const delay = (timeout: number) =>
4+
new Promise<void>((resolve) => setTimeout(resolve, timeout));
5+
6+
/**
7+
* Repeatedly calls the provided function until it returns true or the maximum timeout is reached.
8+
*
9+
* @param {() => Promise<boolean>} fn - The function to be called repeatedly.
10+
* @param {number} startTimeout - The initial timeout in milliseconds.
11+
* @param {number} maxTimeout - The maximum timeout in milliseconds.
12+
* Uses exponential backoff for retries.
13+
* @returns {Promise<void>} A promise that resolves when the function returns true or rejects if
14+
* the maximum timeout is reached.
15+
*/
16+
const retryUntilTrue = (
17+
fn: () => Promise<boolean>,
18+
startTimeout: number,
19+
maxTimeout: number,
20+
) => {
21+
return new Promise<void>(async (resolve, reject) => {
22+
try {
23+
let currentTimeout = startTimeout;
24+
25+
while (true) {
26+
if (await fn()) {
27+
resolve();
28+
29+
return;
30+
}
31+
32+
await delay(currentTimeout);
33+
currentTimeout = Math.min(currentTimeout * 1.5, maxTimeout);
34+
35+
if (currentTimeout >= maxTimeout) {
36+
break;
37+
}
38+
}
39+
40+
reject(
41+
new RetryException(
42+
`Timeout reached (next timeout of ${currentTimeout} is greater than max timeout of` +
43+
` ${maxTimeout})`,
44+
),
45+
);
46+
} catch (err) {
47+
const toThrow = new RetryException((err as Error).message);
48+
49+
if (err instanceof Error) {
50+
toThrow.stack = err.stack;
51+
}
52+
53+
reject(toThrow);
54+
}
55+
});
56+
};
57+
58+
export default retryUntilTrue;
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
import { URL } from 'node:url';
2+
3+
export const getUrlHost = (url: string) => {
4+
return new URL(url).host;
5+
};

0 commit comments

Comments
 (0)