Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 127 additions & 36 deletions packages/core/src/storages/sitemap_request_list.ts
Original file line number Diff line number Diff line change
Expand Up @@ -103,18 +103,30 @@
* Crawlee configuration
*/
config?: Configuration;
/**
* Maximum number of sitemaps to load in parallel.
* This includes both the initial sitemaps and any nested sitemaps discovered during parsing.
*
* @default 1
*/
concurrency?: number;
}

interface SitemapParsingProgress {
inProgressSitemapUrl: string | null;
inProgressSitemapUrls: Set<string>;
inProgressEntries: Set<string>;
pendingSitemapUrls: Set<string>;
}

interface SitemapRequestListState {
urlQueue: string[];
reclaimed: string[];
sitemapParsingProgress: Record<keyof SitemapParsingProgress, any>;
sitemapParsingProgress: {
pendingSitemapUrls: string[];
inProgressSitemapUrls?: string[];
inProgressSitemapUrl?: string | null; // Backward compatibility with old state format
inProgressEntries: string[];
};
abortLoading: boolean;
closed: boolean;
requestData: [string, Request][];
Expand Down Expand Up @@ -147,19 +159,24 @@
*/
private sitemapParsingProgress: SitemapParsingProgress = {
/**
* URL of the sitemap that is currently being parsed. `null` if no sitemap is being parsed.
* Set of sitemap URLs that are currently being parsed.
*/
inProgressSitemapUrl: null,
inProgressSitemapUrls: new Set<string>(),
/**
* Buffer for URLs from the currently parsed sitemap. Used for tracking partially loaded sitemaps across migrations.
* Buffer for URLs from the currently parsed sitemaps. Used for tracking partially loaded sitemaps across migrations.
*/
inProgressEntries: new Set<string>(),
/**
* Set of sitemap URLs that have not been parsed yet. If the set is empty and `inProgressSitemapUrl` is `null`, the sitemap loading is finished.
* Set of sitemap URLs that have not been parsed yet. If the set is empty and `inProgressSitemapUrls` is empty, the sitemap loading is finished.
*/
pendingSitemapUrls: new Set<string>(),
};

/**
* Maximum number of sitemaps to load in parallel.
*/
private concurrency: number;

/**
* Object stream of URLs parsed from the sitemaps.
* Using `highWaterMark`, this can manage the speed of the sitemap loading.
Expand Down Expand Up @@ -224,10 +241,13 @@
regexps: ow.optional.array.ofType(ow.any(ow.regExp, ow.object.hasKeys('regexp'))),
config: ow.optional.object,
persistenceOptions: ow.optional.object,
concurrency: ow.optional.number.integer.greaterThanOrEqual(1),
}),
);

const { globs, exclude, regexps, config = Configuration.getGlobalConfig() } = options;
const { globs, exclude, regexps, config = Configuration.getGlobalConfig(), concurrency = 1 } = options;

this.concurrency = concurrency;

if (exclude?.length) {
for (const excl of exclude) {
Expand Down Expand Up @@ -357,11 +377,55 @@
*/
isSitemapFullyLoaded(): boolean {
return (
this.sitemapParsingProgress.inProgressSitemapUrl === null &&
this.sitemapParsingProgress.inProgressSitemapUrls.size === 0 &&
this.sitemapParsingProgress.pendingSitemapUrls.size === 0
);
}

/**
* Processes a single sitemap URL and emits its URLs to the queue.
* Nested sitemaps are added to the pending queue for later processing.
*/
private async loadSingleSitemap(
sitemapUrl: string,
parseSitemapOptions?: SitemapRequestListOptions['parseSitemapOptions'],
): Promise<void> {
this.sitemapParsingProgress.inProgressSitemapUrls.add(sitemapUrl);

try {
for await (const item of parseSitemap([{ type: 'url', url: sitemapUrl }], this.proxyUrl, {
...parseSitemapOptions,
maxDepth: 0,
emitNestedSitemaps: true,
})) {
if (this.abortLoading) {
break;
}

if (!item.originSitemapUrl) {
// This is a nested sitemap - add to pending if not already processed or in progress
if (
!this.sitemapParsingProgress.inProgressSitemapUrls.has(item.loc) &&
!this.sitemapParsingProgress.inProgressEntries.has(item.loc)
) {
this.sitemapParsingProgress.pendingSitemapUrls.add(item.loc);
}
continue;
}

if (!this.sitemapParsingProgress.inProgressEntries.has(item.loc)) {
await this.pushNextUrl(item.loc);
this.sitemapParsingProgress.inProgressEntries.add(item.loc);
Comment thread
baptistejamin marked this conversation as resolved.
Outdated
}
}
} catch (e: any) {
this.log.error(`Error loading sitemap contents from ${sitemapUrl}:`, e);
} finally {
this.sitemapParsingProgress.inProgressSitemapUrls.delete(sitemapUrl);
this.sitemapParsingProgress.pendingSitemapUrls.delete(sitemapUrl);
}
}

/**
* Start processing the sitemaps and loading the URLs.
*
Expand All @@ -372,35 +436,47 @@
}: {
parseSitemapOptions?: SitemapRequestListOptions['parseSitemapOptions'];
}): Promise<void> {
while (!this.isSitemapFullyLoaded() && !this.abortLoading) {
const sitemapUrl =
this.sitemapParsingProgress.inProgressSitemapUrl ??
this.sitemapParsingProgress.pendingSitemapUrls.values().next().value!;

try {
for await (const item of parseSitemap([{ type: 'url', url: sitemapUrl }], this.proxyUrl, {
...parseSitemapOptions,
maxDepth: 0,
emitNestedSitemaps: true,
})) {
if (!item.originSitemapUrl) {
// This is a nested sitemap
this.sitemapParsingProgress.pendingSitemapUrls.add(item.loc);
continue;
const activeLoaders: Promise<void>[] = [];

const startNewLoaders = () => {
while (
!this.abortLoading &&
activeLoaders.length < this.concurrency &&
this.sitemapParsingProgress.pendingSitemapUrls.size > 0
) {
const sitemapUrl = this.sitemapParsingProgress.pendingSitemapUrls.values().next().value!;
this.sitemapParsingProgress.pendingSitemapUrls.delete(sitemapUrl);

const loaderPromise = this.loadSingleSitemap(sitemapUrl, parseSitemapOptions).finally(() => {
const index = activeLoaders.indexOf(loaderPromise);
if (index !== -1) {
activeLoaders.splice(index, 1);

Check failure on line 453 in packages/core/src/storages/sitemap_request_list.ts

View workflow job for this annotation

GitHub Actions / Lint

An array of Promises may be unintentional. Consider handling the promises' fulfillment or rejection with Promise.all or similar, or explicitly marking the expression as ignored with the `void` operator
}
});

if (!this.sitemapParsingProgress.inProgressEntries.has(item.loc)) {
await this.pushNextUrl(item.loc);
this.sitemapParsingProgress.inProgressEntries.add(item.loc);
}
}
} catch (e: any) {
this.log.error('Error loading sitemap contents:', e);
activeLoaders.push(loaderPromise);
}
};

this.sitemapParsingProgress.pendingSitemapUrls.delete(sitemapUrl);
this.sitemapParsingProgress.inProgressEntries.clear();
this.sitemapParsingProgress.inProgressSitemapUrl = null;
// Start initial batch of loaders
startNewLoaders();

// Keep loading until all sitemaps are processed
while (
(activeLoaders.length > 0 || this.sitemapParsingProgress.pendingSitemapUrls.size > 0) &&
!this.abortLoading
) {
if (activeLoaders.length > 0) {
// Wait for at least one loader to complete
await Promise.race(activeLoaders);
}
// Start new loaders if slots are available
startNewLoaders();
}

// Wait for any remaining active loaders to complete
if (activeLoaders.length > 0) {
await Promise.all(activeLoaders);
}

this.urlQueueStream.end();
Expand Down Expand Up @@ -505,7 +581,7 @@
await this.store.setValue(this.persistStateKey, {
sitemapParsingProgress: {
pendingSitemapUrls: Array.from(this.sitemapParsingProgress.pendingSitemapUrls),
inProgressSitemapUrl: this.sitemapParsingProgress.inProgressSitemapUrl,
inProgressSitemapUrls: Array.from(this.sitemapParsingProgress.inProgressSitemapUrls),
inProgressEntries: Array.from(this.sitemapParsingProgress.inProgressEntries),
},
urlQueue,
Expand All @@ -531,9 +607,24 @@
}

this.reclaimed = new Set(state.reclaimed);

// Handle backward compatibility: old state had inProgressSitemapUrl (string | null),
// new state has inProgressSitemapUrls (array)
const inProgressSitemapUrls = state.sitemapParsingProgress.inProgressSitemapUrls
? new Set<string>(state.sitemapParsingProgress.inProgressSitemapUrls)
: new Set<string>();

// Backward compatibility: if old state had inProgressSitemapUrl, add it to pending for retry
if (state.sitemapParsingProgress.inProgressSitemapUrl && !state.sitemapParsingProgress.inProgressSitemapUrls) {
inProgressSitemapUrls.add(state.sitemapParsingProgress.inProgressSitemapUrl);
}

this.sitemapParsingProgress = {
pendingSitemapUrls: new Set(state.sitemapParsingProgress.pendingSitemapUrls),
inProgressSitemapUrl: state.sitemapParsingProgress.inProgressSitemapUrl,
pendingSitemapUrls: new Set([
...state.sitemapParsingProgress.pendingSitemapUrls,
...inProgressSitemapUrls, // Re-add in-progress sitemaps to pending for retry on restore
]),
inProgressSitemapUrls: new Set<string>(),
inProgressEntries: new Set(state.sitemapParsingProgress.inProgressEntries),
};

Expand Down
Loading
Loading