Skip to content

Commit e06423d

Browse files
committed
fix(core): cap retries for unprocessed requests in addRequestsBatched
When the request queue permanently rejects a request (e.g. a 400 from the platform batch-add API for a malformed `userData` shape), the recursive helper kept resubmitting the unprocessed requests forever, hanging the crawler with no actionable error. Bound the recursion to a small number of consecutive no-progress attempts; once exhausted, the remaining unprocessed requests are skipped with a warning. Any progress resets the counter so transient backpressure is still retried. Closes #3764
1 parent 1fe3e7c commit e06423d

2 files changed

Lines changed: 49 additions & 1 deletion

File tree

packages/core/src/storages/request_provider.ts

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ import { getRequestId, purgeDefaultStorages, QUERY_HEAD_MIN_LENGTH } from './uti
3838

3939
export type RequestsLike = AsyncIterable<Source | string> | Iterable<Source | string> | (Source | string)[];
4040

41+
/**
42+
* Maximum number of consecutive batch-add attempts that make no progress before the remaining
43+
* unprocessed requests are skipped, so permanently rejected requests don't retry forever.
44+
*/
45+
const MAX_UNPROCESSED_REQUESTS_RETRIES = 3;
46+
4147
/**
4248
* Represents a provider of requests/URLs to crawl.
4349
*/
@@ -472,12 +478,29 @@ export abstract class RequestProvider implements IStorage, IRequestManager {
472478
);
473479
const chunksIterator = chunks[Symbol.asyncIterator]();
474480

475-
const attemptToAddToQueueAndAddAnyUnprocessed = async (providedRequests: Source[], cache = true) => {
481+
const attemptToAddToQueueAndAddAnyUnprocessed = async (
482+
providedRequests: Source[],
483+
cache = true,
484+
unsuccessfulAttempts = 0,
485+
) => {
476486
const resultsToReturn: ProcessedRequest[] = [];
477487
const apiResult = await this.addRequests(providedRequests, { forefront: options.forefront, cache });
478488
resultsToReturn.push(...apiResult.processedRequests);
479489

480490
if (apiResult.unprocessedRequests.length) {
491+
// Count attempts that make no progress, so permanently rejected requests (e.g. a malformed
492+
// `userData` shape causing a 400) don't loop forever. Any progress resets the counter.
493+
const attempts = apiResult.processedRequests.length ? 0 : unsuccessfulAttempts + 1;
494+
495+
if (attempts >= MAX_UNPROCESSED_REQUESTS_RETRIES) {
496+
this.log.warning(
497+
`Some requests were consistently rejected by the request queue and will be skipped after ${MAX_UNPROCESSED_REQUESTS_RETRIES} attempts. This usually means the request data is malformed (e.g. an invalid 'userData' shape).`,
498+
{ unprocessedRequests: apiResult.unprocessedRequests },
499+
);
500+
501+
return resultsToReturn;
502+
}
503+
481504
await sleep(waitBetweenBatchesMillis);
482505

483506
resultsToReturn.push(
@@ -486,6 +509,7 @@ export abstract class RequestProvider implements IStorage, IRequestManager {
486509
(r) => !apiResult.processedRequests.some((pr) => pr.uniqueKey === r.uniqueKey),
487510
),
488511
false,
512+
attempts,
489513
)),
490514
);
491515
}

test/core/storages/request_queue.test.ts

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,30 @@ describe('RequestQueue remote', () => {
278278
expect(mockAddRequests).toBeCalledWith([requestB, requestC], { forefront: true });
279279
});
280280

281+
test('addRequestsBatched does not retry permanently unprocessed requests forever', async () => {
282+
const queue = new RequestQueue({ id: 'unprocessed-requests', client: storageClient });
283+
const mockAddRequests = vitest.spyOn(queue.client, 'batchAddRequests');
284+
285+
const requestOptions = { url: 'http://example.com/bad' };
286+
const request = new Request(requestOptions);
287+
288+
// Simulate the platform permanently rejecting the request (e.g. a 400 due to a malformed `userData` shape):
289+
// it is always reported back as unprocessed.
290+
mockAddRequests.mockResolvedValue({
291+
processedRequests: [],
292+
unprocessedRequests: [{ uniqueKey: request.uniqueKey, url: request.url, method: 'GET' }],
293+
});
294+
295+
const logWarningSpy = vitest.spyOn(queue.log, 'warning');
296+
297+
const result = await queue.addRequestsBatched([requestOptions], { waitBetweenBatchesMillis: 0 });
298+
299+
// Must not hang: it gives up after a bounded number of attempts and warns about the skipped requests.
300+
expect(result.addedRequests).toHaveLength(0);
301+
expect(logWarningSpy).toHaveBeenCalled();
302+
expect(mockAddRequests.mock.calls.length).toBeLessThan(20);
303+
});
304+
281305
test('should cache new requests locally', async () => {
282306
const queue = new RequestQueue({ id: 'some-id', client: storageClient });
283307

0 commit comments

Comments
 (0)