Skip to content

Commit c8bb798

Browse files
committed
Correctly utilize the assumeSoleOwner flag
1 parent c442f5b commit c8bb798

5 files changed

Lines changed: 172 additions & 21 deletions

File tree

docs/guides/parallel-scraping/parallel-scraper.mjs

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import { fork } from 'node:child_process';
22

3+
import { FileSystemStorageClient } from '@crawlee/fs-storage';
34
import { Configuration, Dataset, PlaywrightCrawler, log } from 'crawlee';
45

56
import { router } from './routes.mjs';
@@ -76,13 +77,19 @@ if (!process.env.IN_WORKER_THREAD) {
7677
// Get the request queue
7778
const requestQueue = await getOrInitQueue(false);
7879

79-
// Disable the automatic purge on start and configure crawlee to store the worker-specific data in a separate directory
80-
// (needs to be done AFTER the queue is initialized when running locally)
80+
// Disable the automatic purge on start, so we don't lose the queue we prepared
8181
const config = new Configuration({
8282
purgeOnStart: false,
83-
storageClientOptions: {
84-
localDataDirectory: `./storage/worker-${process.env.WORKER_INDEX}`,
85-
},
83+
});
84+
85+
// Store the worker's own internal state (its default dataset, key-value store, etc.) in a separate
86+
// directory so the workers don't collide with each other (needs to be done AFTER the queue is
87+
// initialized when running locally). This directory is private to a single worker, so we set
88+
// `assumeSoleOwner: true` — the concurrency-safe locking only matters for the shared `shop-urls`
89+
// queue, which gets its own storage client in `requestQueue.mjs`.
90+
const storageClient = new FileSystemStorageClient({
91+
localDataDirectory: `./storage/worker-${process.env.WORKER_INDEX}`,
92+
assumeSoleOwner: true,
8693
});
8794

8895
workerLogger.debug('Setting up crawler.');
@@ -98,6 +105,10 @@ if (!process.env.IN_WORKER_THREAD) {
98105
// highlight-end
99106
// Let's also limit the crawler's concurrency, we don't want to overload a single process 🐌
100107
maxConcurrency: 5,
108+
// Use the worker-specific, concurrency-safe storage client we created above
109+
// highlight-start
110+
storageClient,
111+
// highlight-end
101112
},
102113
config,
103114
);

docs/guides/parallel-scraping/parallel-scraping.mdx

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,16 @@ The first step in our conversion process will be creating a common file (let's c
6060

6161
The exported function, `getOrInitQueue`, might seem like it does a lot. In essence, it just ensures the request queue is initialized, and if requested, ensures it starts off with an empty state.
6262

63+
:::caution Make the shared queue concurrency-safe with `assumeSoleOwner: false`
64+
65+
Because every worker process opens this same `shop-urls` queue at the same time, it **must** use the concurrency-safe locking behavior of `FileSystemStorageClient`. That's why `getOrInitQueue` opens the queue with a storage client constructed with `assumeSoleOwner: false`.
66+
67+
By default, `FileSystemStorageClient` assumes it is the *sole* consumer of a queue (`assumeSoleOwner: true`). On open it immediately reclaims any requests left *in progress* — great for a single-process crawl recovering after a crash, but disastrous when workers run side by side: each worker would happily grab requests another worker is still processing, so the same URL gets scraped multiple times.
68+
69+
Setting `assumeSoleOwner: false` tells the client to treat an in-progress request as a potential live peer's lock and only reclaim it once the lock expires on the wall clock, so two workers never process the same request at once.
70+
71+
:::
72+
6373
### Adapting our previous scraper to enqueue the product URLs to the new queue
6474

6575
In the `src/routes.mjs` file of the scraper we previously built, we have a handler for the `CATEGORY` label. Let's adapt that handler to enqueue the product URLs to the new queue we created.
@@ -122,34 +132,44 @@ This will check how the script is executed as. If this value has _any_ value, it
122132

123133
We use this to ensure the parent process stays alive until all the worker processes exit. Otherwise, the worker processes would just get spawned, and lose the ability to communicate with the parent. You might not need this depending on your use case (maybe you just need to spawn workers and let them process).
124134

125-
#### What's with all those `Configuration` calls?
135+
#### What's with all the `Configuration` and storage client setup?
126136

127-
There are three steps we want to do for the worker processes:
137+
There are two things we want to do for the worker processes:
128138

129-
- get the queue that supports locking from the same location as the parent process
130-
- ensure the default storages do **not** get purged on start, as otherwise we'd lose the queue we prepared, and initialize a special storage for worker processes so they do not collide with each other
139+
- get the shared queue from the same location as the parent process (it already comes with the concurrency-safe storage client we set up in `requestQueue.mjs`)
140+
- ensure the default storages do **not** get purged on start, as otherwise we'd lose the queue we prepared, and give each worker its own private storage directory for its internal state so the workers don't collide with each other
131141

132142
In order, that's what these lines do:
133143

134144
```javascript title="src/parallel-scraper.mjs"
135-
// Get the request queue from the parent process (step 1)
145+
import { FileSystemStorageClient } from '@crawlee/fs-storage';
146+
147+
// Get the shared request queue from the parent process (step 1)
136148
const requestQueue = await getOrInitQueue(false);
137149

138-
// Disable the automatic purge on start and configure crawlee to store the worker-specific data
139-
// in a separate directory (needs to be done AFTER the queue is initialized when running locally) (step 2)
140-
const config = new Configuration({
141-
purgeOnStart: false,
142-
storageClientOptions: {
143-
localDataDirectory: `./storage/worker-${process.env.WORKER_INDEX}`,
144-
},
150+
// Disable the automatic purge on start, so we don't lose the queue we prepared (step 2)
151+
const config = new Configuration({ purgeOnStart: false });
152+
153+
// Store the worker's own internal state in a separate directory so workers don't collide (step 2,
154+
// cont.). Needs to be done AFTER the queue is initialized when running locally. This directory is
155+
// private to a single worker, so we explicitly set `assumeSoleOwner: true`.
156+
const storageClient = new FileSystemStorageClient({
157+
localDataDirectory: `./storage/worker-${process.env.WORKER_INDEX}`,
158+
assumeSoleOwner: true,
145159
});
146160
```
147161

162+
:::note Why no `assumeSoleOwner: false` here?
163+
164+
Each worker's `./storage/worker-N` directory is private to that single worker — nothing else opens it — so the default `assumeSoleOwner: true` is exactly right. The concurrency-safe locking only matters for storage that is genuinely shared across processes, which is the `shop-urls` queue in `requestQueue.mjs`, not this per-worker internal state.
165+
166+
:::
167+
148168
#### Telling the crawler to use the worker configuration
149169

150170
You might have noticed several lines highlighted in the code above. Those show how you provide the shared request queue to the crawler.
151171

152-
You might have also noticed we passed in a second parameter to the constructor of the crawler, the `config` variable we created earlier. This is needed to ensure the crawler uses the worker-specific storages for internal states, and that they do not collide with each other.
172+
You might have also noticed we passed in the `config` and `storageClient` we created earlier to the crawler. These ensure the crawler uses the worker-specific storages for its own internal state (so the workers do not collide with each other), while still consuming the shared, concurrency-safe `shop-urls` queue we provided explicitly.
153173

154174
#### Why do we use `process.send` instead of `context.pushData`?
155175

docs/guides/parallel-scraping/shared.mjs

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
1+
import { FileSystemStorageClient } from '@crawlee/fs-storage';
12
import { RequestQueue } from 'crawlee';
23

34
// The request queue shared by all the parallel workers
45
let queue;
56

7+
// The `shop-urls` queue is opened concurrently by every worker process, so it must use the
8+
// concurrency-safe locking behavior. With `assumeSoleOwner: false`, a request another worker is
9+
// still processing is treated as a live peer's lock and is not handed out again until that lock
10+
// expires — so two workers never scrape the same URL at once. (We point at the default `./storage`
11+
// location, which is where this shared queue lives.)
12+
const sharedStorageClient = new FileSystemStorageClient({ assumeSoleOwner: false });
13+
614
/**
715
* @param {boolean} makeFresh Whether the queue should be cleared before returning it
816
* @returns The queue
@@ -12,11 +20,11 @@ export async function getOrInitQueue(makeFresh = false) {
1220
return queue;
1321
}
1422

15-
queue = await RequestQueue.open('shop-urls');
23+
queue = await RequestQueue.open('shop-urls', { storageClient: sharedStorageClient });
1624

1725
if (makeFresh) {
1826
await queue.drop();
19-
queue = await RequestQueue.open('shop-urls');
27+
queue = await RequestQueue.open('shop-urls', { storageClient: sharedStorageClient });
2028
}
2129

2230
return queue;

packages/fs-storage/src/file-system-storage.ts

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,22 @@ export interface FileSystemStorageOptions {
2525
* Optional logger for FileSystemStorageClient warnings.
2626
*/
2727
logger?: CrawleeLogger;
28+
29+
/**
30+
* Assert that this process is the *sole* consumer of every request queue it opens.
31+
*
32+
* When `true` (the default), opening a queue immediately reclaims any requests that a previous
33+
* run left *in progress* (e.g. after a crash), so they become fetchable again right away. This is
34+
* the right behavior for the common single-process crawl.
35+
*
36+
* Set this to `false` if multiple processes share the same on-disk request queue concurrently
37+
* (for example, the {@apilink parallel scraping setup | "Parallel Scraping Guide"}). In that mode
38+
* an in-progress request is treated as a potential live peer's lock and is only reclaimed once
39+
* that lock expires on the wall clock, so two workers won't process the same request at once.
40+
*
41+
* @default true
42+
*/
43+
assumeSoleOwner?: boolean;
2844
}
2945

3046
/**
@@ -41,6 +57,7 @@ export class FileSystemStorageClient implements storage.StorageClient {
4157
readonly keyValueStoresDirectory: string;
4258
readonly requestQueuesDirectory: string;
4359
readonly logger?: CrawleeLogger;
60+
readonly assumeSoleOwner: boolean;
4461

4562
readonly keyValueStoreCache: KeyValueStoreClient[] = [];
4663
readonly datasetClientCache: DatasetClient[] = [];
@@ -49,9 +66,11 @@ export class FileSystemStorageClient implements storage.StorageClient {
4966
constructor(options: FileSystemStorageOptions = {}) {
5067
s.object({
5168
localDataDirectory: s.string().optional(),
69+
assumeSoleOwner: s.boolean().optional(),
5270
}).parse(options);
5371

5472
this.logger = options.logger;
73+
this.assumeSoleOwner = options.assumeSoleOwner ?? true;
5574

5675
// v3.0.0 used `crawlee_storage` as the default, we changed this in v3.0.1 to just `storage`,
5776
// this function handles it without making BC breaks - it respects existing `crawlee_storage`
@@ -165,7 +184,15 @@ export class FileSystemStorageClient implements storage.StorageClient {
165184
}
166185
}
167186

168-
const nativeClient = await NativeRequestQueueClient.open(id, name, alias, this.localDataDirectory);
187+
const nativeClient = await NativeRequestQueueClient.open(
188+
id,
189+
name,
190+
alias,
191+
this.localDataDirectory,
192+
// useTestClock — always real wall-clock outside of native tests.
193+
undefined,
194+
this.assumeSoleOwner,
195+
);
169196
const newStore = await RequestQueueClient.create({
170197
name: alias ? undefined : (name ?? cacheKey),
171198
cacheKey: cacheKey ?? '',
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
import { rm } from 'node:fs/promises';
2+
import { resolve } from 'node:path';
3+
4+
import { FileSystemStorageClient } from '@crawlee/fs-storage';
5+
6+
// `assumeSoleOwner` controls how the native `@crawlee/fs-storage-native` extension treats requests
7+
// left *in progress* by a previous run (a dangling `orderNo` lock on disk) when a queue is reopened.
8+
// The reclaim/respect-peer-lock semantics are owned by the native extension; these tests verify the
9+
// adapter's contract on top of it: the option defaults to `true`, is honored when set, and that the
10+
// resulting behavior reaches all the way down to the native queue.
11+
describe('FileSystemStorageClient assumeSoleOwner', () => {
12+
const tmpLocation = resolve(import.meta.dirname, './tmp/assume-sole-owner');
13+
14+
afterEach(async () => {
15+
await rm(tmpLocation, { force: true, recursive: true });
16+
});
17+
18+
test('defaults to true', () => {
19+
const storage = new FileSystemStorageClient({ localDataDirectory: tmpLocation });
20+
expect(storage.assumeSoleOwner).toBe(true);
21+
});
22+
23+
test('respects an explicit false', () => {
24+
const storage = new FileSystemStorageClient({ localDataDirectory: tmpLocation, assumeSoleOwner: false });
25+
expect(storage.assumeSoleOwner).toBe(false);
26+
});
27+
28+
// Seed a queue with two requests, fetch (lock) one without handling it or tearing down — leaving a
29+
// dangling in-progress lock on disk, exactly the "process died mid-flight" situation.
30+
async function seedQueueWithDanglingLock(dir: string) {
31+
const storage = new FileSystemStorageClient({ localDataDirectory: dir });
32+
const queue = await storage.createRequestQueueClient({ name: 'default' });
33+
await queue.addBatchOfRequests([
34+
{ url: 'http://example.com/1', uniqueKey: '1' },
35+
{ url: 'http://example.com/2', uniqueKey: '2' },
36+
]);
37+
const locked = await queue.fetchNextRequest();
38+
expect(locked).not.toBeNull();
39+
// Intentionally NO markRequestAsHandled and NO teardown/persistState — the lock is left dangling.
40+
return locked!;
41+
}
42+
43+
test('true (default): reopening preserves contents but relinquishes the dangling lock', async () => {
44+
const dir = resolve(tmpLocation, 'sole-owner-true');
45+
const locked = await seedQueueWithDanglingLock(dir);
46+
47+
// Reopen the same directory as sole owner, without purging.
48+
const reopened = new FileSystemStorageClient({ localDataDirectory: dir, assumeSoleOwner: true });
49+
const queue = await reopened.createRequestQueueClient({ name: 'default' });
50+
51+
// Contents preserved: both requests still present, none handled.
52+
const metadata = await queue.getMetadata();
53+
expect(metadata.totalRequestCount).toBe(2);
54+
expect(metadata.handledRequestCount).toBe(0);
55+
expect(metadata.pendingRequestCount).toBe(2);
56+
57+
// Lock relinquished: BOTH requests are fetchable again, including the one locked before.
58+
const a = await queue.fetchNextRequest();
59+
const b = await queue.fetchNextRequest();
60+
expect([a?.uniqueKey, b?.uniqueKey].sort()).toStrictEqual(['1', '2']);
61+
// The previously-locked request survived with its data intact.
62+
const reFetched = await queue.getRequest(locked.uniqueKey);
63+
expect(reFetched?.url).toBe(locked.url);
64+
});
65+
66+
test('false: reopening keeps the dangling lock (concurrency-safe mode)', async () => {
67+
const dir = resolve(tmpLocation, 'sole-owner-false');
68+
await seedQueueWithDanglingLock(dir);
69+
70+
// Reopen in concurrency-safe mode: an in-progress request is treated as a potential live peer's
71+
// lock and is NOT reclaimed until it expires.
72+
const reopened = new FileSystemStorageClient({ localDataDirectory: dir, assumeSoleOwner: false });
73+
const queue = await reopened.createRequestQueueClient({ name: 'default' });
74+
75+
// Contents are still preserved...
76+
const metadata = await queue.getMetadata();
77+
expect(metadata.totalRequestCount).toBe(2);
78+
expect(metadata.pendingRequestCount).toBe(2);
79+
80+
// ...but only the un-locked request is handed out; the locked one stays in progress.
81+
const a = await queue.fetchNextRequest();
82+
expect(a?.uniqueKey).toBe('2');
83+
expect(await queue.fetchNextRequest()).toBeNull();
84+
});
85+
});

0 commit comments

Comments
 (0)