diff --git a/package-lock.json b/package-lock.json index 2002fab0cb..bc2b10be8c 100644 --- a/package-lock.json +++ b/package-lock.json @@ -9,7 +9,7 @@ "version": "3.6.0", "license": "Apache-2.0", "dependencies": { - "@apify/consts": "^2.47.1", + "@apify/consts": "^2.51.0", "@apify/input_secrets": "^1.2.0", "@apify/log": "^2.4.3", "@apify/timeout": "^0.3.0", diff --git a/package.json b/package.json index dee8d4d8f1..106f554e5e 100644 --- a/package.json +++ b/package.json @@ -76,7 +76,7 @@ "*": "prettier --write --ignore-unknown" }, "dependencies": { - "@apify/consts": "^2.47.1", + "@apify/consts": "^2.51.0", "@apify/input_secrets": "^1.2.0", "@apify/log": "^2.4.3", "@apify/timeout": "^0.3.0", diff --git a/src/actor.ts b/src/actor.ts index 1c83f94c28..cf6ff7aeca 100644 --- a/src/actor.ts +++ b/src/actor.ts @@ -14,7 +14,6 @@ import { EventType, purgeDefaultStorages, RequestQueue, - StorageManager, } from '@crawlee/core'; import type { Awaitable, @@ -65,6 +64,12 @@ import { import { PlatformEventManager } from './platform_event_manager.js'; import type { ProxyConfigurationOptions } from './proxy_configuration.js'; import { ProxyConfiguration } from './proxy_configuration.js'; +import type { + OpenStorageOptions, + StorageIdentifier, + StorageIdentifierWithoutAlias, +} from './storage.js'; +import { openStorage } from './storage.js'; import { checkCrawleeVersion, getSystemInfo } from './utils.js'; export interface InitOptions { @@ -360,15 +365,6 @@ export interface RebootOptions { customAfterSleepMillis?: number; } -export interface OpenStorageOptions { - /** - * If set to `true` then the cloud storage is used even if the `CRAWLEE_STORAGE_DIR` - * environment variable is set. This way it is possible to combine local and cloud storage. - * @default false - */ - forceCloud?: boolean; -} - export { ClientActorRun as ActorRun }; /** @@ -429,6 +425,13 @@ export class Actor { private chargingManager: ChargingManager; + /** + * Tracks which aliased storages have been purged during this session, + * so we only purge them once (on first open) when running locally. + * @internal + */ + purgedStorageAliases = new Set(); + constructor(options: ConfigurationOptions = {}) { // use default configuration object if nothing overridden (it fallbacks to env vars) this.config = @@ -1122,16 +1125,17 @@ export class Actor { * For more details and code examples, see the {@apilink Dataset} class. * * @param [datasetIdOrName] - * ID or name of the dataset to be opened. If `null` or `undefined`, + * ID, name, or alias of the dataset to be opened. If `null` or `undefined`, * the function returns the default dataset associated with the Actor run. + * You can also pass `{ alias: 'name' }` to open a dataset defined in the Actor's schema storages, + * `{ id: 'abc' }` to open by explicit ID, or `{ name: 'abc' }` to open by explicit name. * @param [options] * @ignore */ async openDataset( - datasetIdOrName?: string | null, + datasetIdOrName?: StorageIdentifier | null, options: OpenStorageOptions = {}, ): Promise> { - ow(datasetIdOrName, ow.optional.string); ow( options, ow.object.exactShape({ @@ -1313,14 +1317,14 @@ export class Actor { * @param [storeIdOrName] * ID or name of the key-value store to be opened. If `null` or `undefined`, * the function returns the default key-value store associated with the Actor run. + * You can also pass `{ id: 'abc' }` to open by explicit ID, or `{ name: 'abc' }` to open by explicit name. * @param [options] * @ignore */ async openKeyValueStore( - storeIdOrName?: string | null, + storeIdOrName?: StorageIdentifierWithoutAlias | null, options: OpenStorageOptions = {}, ): Promise { - ow(storeIdOrName, ow.optional.string); ow( options, ow.object.exactShape({ @@ -1347,14 +1351,14 @@ export class Actor { * @param [queueIdOrName] * ID or name of the request queue to be opened. If `null` or `undefined`, * the function returns the default request queue associated with the Actor run. + * You can also pass `{ id: 'abc' }` to open by explicit ID, or `{ name: 'abc' }` to open by explicit name. * @param [options] * @ignore */ async openRequestQueue( - queueIdOrName?: string | null, + queueIdOrName?: StorageIdentifierWithoutAlias | null, options: OpenStorageOptions = {}, ): Promise { - ow(queueIdOrName, ow.optional.string); ow( options, ow.object.exactShape({ @@ -2023,12 +2027,14 @@ export class Actor { * For more details and code examples, see the {@apilink Dataset} class. * * @param [datasetIdOrName] - * ID or name of the dataset to be opened. If `null` or `undefined`, + * ID, name, or alias of the dataset to be opened. If `null` or `undefined`, * the function returns the default dataset associated with the Actor run. + * You can also pass `{ alias: 'name' }` to open a dataset defined in the Actor's schema storages, + * `{ id: 'abc' }` to open by explicit ID, or `{ name: 'abc' }` to open by explicit name. * @param [options] */ static async openDataset( - datasetIdOrName?: string | null, + datasetIdOrName?: StorageIdentifier | null, options: OpenStorageOptions = {}, ): Promise> { return Actor.getDefaultInstance().openDataset(datasetIdOrName, options); @@ -2158,10 +2164,11 @@ export class Actor { * @param [storeIdOrName] * ID or name of the key-value store to be opened. If `null` or `undefined`, * the function returns the default key-value store associated with the Actor run. + * You can also pass `{ id: 'abc' }` to open by explicit ID, or `{ name: 'abc' }` to open by explicit name. * @param [options] */ static async openKeyValueStore( - storeIdOrName?: string | null, + storeIdOrName?: StorageIdentifierWithoutAlias | null, options: OpenStorageOptions = {}, ): Promise { return Actor.getDefaultInstance().openKeyValueStore( @@ -2184,10 +2191,11 @@ export class Actor { * @param [queueIdOrName] * ID or name of the request queue to be opened. If `null` or `undefined`, * the function returns the default request queue associated with the Actor run. + * You can also pass `{ id: 'abc' }` to open by explicit ID, or `{ name: 'abc' }` to open by explicit name. * @param [options] */ static async openRequestQueue( - queueIdOrName?: string | null, + queueIdOrName?: StorageIdentifierWithoutAlias | null, options: OpenStorageOptions = {}, ): Promise { return Actor.getDefaultInstance().openRequestQueue( @@ -2366,16 +2374,14 @@ export class Actor { private async _openStorage( storageClass: Constructor, - id?: string, + identifier?: StorageIdentifier | null, options: OpenStorageOptions = {}, ) { - const client = options.forceCloud ? this.apifyClient : undefined; - return StorageManager.openStorage( - storageClass, - id, - client, - this.config, - ); + return openStorage(storageClass, identifier, { + config: this.config, + client: options.forceCloud ? this.apifyClient : undefined, + purgedStorageAliases: this.purgedStorageAliases, + }); } private _ensureActorInit(methodCalled: string) { diff --git a/src/configuration.ts b/src/configuration.ts index c5bb9f776d..85ec1246be 100644 --- a/src/configuration.ts +++ b/src/configuration.ts @@ -41,6 +41,7 @@ export interface ConfigurationOptions extends CoreConfigurationOptions { useChargingLogDataset?: boolean; actorPricingInfo?: string; chargedEventCounts?: string; + actorStoragesJson?: string; } /** @@ -185,6 +186,7 @@ export class Configuration extends CoreConfiguration { ACTOR_USE_CHARGING_LOG_DATASET: 'useChargingLogDataset', APIFY_ACTOR_PRICING_INFO: 'actorPricingInfo', APIFY_CHARGED_ACTOR_EVENT_COUNTS: 'chargedEventCounts', + ACTOR_STORAGES_JSON: 'actorStoragesJson', }; protected static override INTEGER_VARS = [ diff --git a/src/index.ts b/src/index.ts index e3e594c502..b628e41b20 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,4 +1,12 @@ export * from './actor.js'; +export type { + OpenStorageOptions, + StorageAlias, + StorageId, + StorageName, + StorageIdentifier, + StorageIdentifierWithoutAlias, +} from './storage.js'; export { ChargeOptions, ChargeResult, diff --git a/src/storage.ts b/src/storage.ts new file mode 100644 index 0000000000..109fb4afea --- /dev/null +++ b/src/storage.ts @@ -0,0 +1,201 @@ +import type { IStorage } from '@crawlee/core'; +import { StorageManager } from '@crawlee/core'; +import type { Constructor, StorageClient } from '@crawlee/types'; +import { ApifyClient } from 'apify-client'; + +import type { Configuration } from './configuration.js'; + +export interface OpenStorageOptions { + /** + * If set to `true` then the cloud storage is used even if the `CRAWLEE_STORAGE_DIR` + * environment variable is set. This way it is possible to combine local and cloud storage. + * @default false + */ + forceCloud?: boolean; +} + +/** + * Identifies a storage by its alias from the Actor's schema storages + * (resolved via the `ACTOR_STORAGES_JSON` environment variable). + */ +export interface StorageAlias { + alias: string; +} + +/** + * Identifies a storage by its platform ID. + */ +export interface StorageId { + id: string; +} + +/** + * Identifies a storage by its name. + */ +export interface StorageName { + name: string; +} + +/** + * Identifies a storage to open. Can be: + * - A plain `string` for backward compatibility (treated as ID or name) + * - `{ alias: string }` to resolve from the Actor's schema storages (`ACTOR_STORAGES_JSON`) + * - `{ id: string }` to open by explicit platform ID + * - `{ name: string }` to open by explicit name + */ +export type StorageIdentifier = string | StorageAlias | StorageId | StorageName; + +/** + * Identifies a storage to open, without alias support. + * Used for key-value stores and request queues, which do not support aliases. + * Can be: + * - A plain `string` for backward compatibility (treated as ID or name) + * - `{ id: string }` to open by explicit platform ID + * - `{ name: string }` to open by explicit name + */ +export type StorageIdentifierWithoutAlias = string | StorageId | StorageName; + +/** + * The parsed shape of the `ACTOR_STORAGES_JSON` environment variable. + */ +interface ActorStorages { + datasets: Record; + keyValueStores: Record; + requestQueues: Record; +} + +const STORAGE_TYPE_KEYS: Record = { + Dataset: 'datasets', + KeyValueStore: 'keyValueStores', + RequestQueue: 'requestQueues', +}; + +const parsedStoragesJson = new Map(); + +/** + * Resolves a {@link StorageIdentifier} to a plain string ID or name + * that can be passed to Crawlee's `StorageManager.openStorage()`. + */ +function resolveStorageIdentifier( + storageType: 'Dataset' | 'KeyValueStore' | 'RequestQueue', + identifier: StorageIdentifier | null | undefined, + config: Configuration, +): string | undefined { + if (identifier === null || identifier === undefined) { + return undefined; + } + + if (typeof identifier === 'string') { + return identifier; + } + + if ('id' in identifier) { + return identifier.id; + } + + if ('name' in identifier) { + return identifier.name; + } + + // { alias: string } + const storagesJson = config.get('actorStoragesJson'); + if (config.get('isAtHome') && storagesJson) { + let storages: ActorStorages; + try { + if (!parsedStoragesJson.has(storagesJson)) { + parsedStoragesJson.set(storagesJson, JSON.parse(storagesJson)); + } + storages = parsedStoragesJson.get(storagesJson)!; + } catch { + throw new Error( + `Failed to parse ACTOR_STORAGES_JSON environment variable: ${storagesJson}`, + ); + } + + const typeKey = STORAGE_TYPE_KEYS[storageType]; + const resolvedId = storages[typeKey]?.[identifier.alias]; + + if (resolvedId) { + return resolvedId; + } + + throw new Error( + `Storage alias "${identifier.alias}" not found in ACTOR_STORAGES_JSON for storage type "${storageType}". ` + + `Available aliases: ${Object.keys(storages[typeKey] ?? {}).join(', ') || '(none)'}`, + ); + } + + // When using local storage, just use the alias as a name. + // When using platform storage, we can't just make up a name — the alias must be + // in ACTOR_STORAGES_JSON. + if (config.get('isAtHome')) { + throw new Error( + `Storage alias "${identifier.alias}" cannot be resolved because ACTOR_STORAGES_JSON is not set. ` + + `Aliases are only available for storages declared in the Actor's schema.`, + ); + } + + return identifier.alias; +} + +export interface OpenStorageContext { + config: Configuration; + client?: StorageClient; + purgedStorageAliases: Set; +} + +/** + * Opens a storage by its identifier, handling alias resolution and local purging. + */ +export async function openStorage( + storageClass: Constructor, + identifier: StorageIdentifier | null | undefined, + context: OpenStorageContext, +): Promise { + const isAlias = + identifier !== null && + identifier !== undefined && + typeof identifier === 'object' && + 'alias' in identifier; + + if ( + isAlias && + !context.config.get('isAtHome') && + context.client instanceof ApifyClient + ) { + throw new Error( + 'The `alias` option is not allowed for Apify-based storages running outside of Apify', + ); + } + + const resolvedIdOrName = resolveStorageIdentifier( + storageClass.name as 'Dataset' | 'KeyValueStore' | 'RequestQueue', + identifier, + context.config, + ); + + // When running locally, purge aliased storages on first open + // (similar to how Crawlee purges default storages on start) + if ( + isAlias && + !context.config.get('isAtHome') && + context.config.get('purgeOnStart') && + !context.purgedStorageAliases.has(identifier.alias) + ) { + context.purgedStorageAliases.add(identifier.alias); + const existingStorage = await StorageManager.openStorage( + storageClass, + resolvedIdOrName, + context.client, + context.config, + ); + await (existingStorage as T & { drop(): Promise }).drop(); + } + + return StorageManager.openStorage( + storageClass, + resolvedIdOrName, + context.client, + context.config, + ); +} diff --git a/test/apify/actor.test.ts b/test/apify/actor.test.ts index 040ab52dbe..08e838c0a5 100644 --- a/test/apify/actor.test.ts +++ b/test/apify/actor.test.ts @@ -785,6 +785,156 @@ describe('Actor', () => { sdk.apifyClient, ); }); + + describe('StorageIdentifier support', () => { + const STORAGES_JSON = JSON.stringify({ + datasets: { + custom: 'dataset-id-123', + default: 'default-ds-id', + }, + keyValueStores: { + custom: 'kvs-id-456', + default: 'default-kvs-id', + }, + requestQueues: { + custom: 'rq-id-789', + default: 'default-rq-id', + }, + }); + + test('openDataset with { alias } resolves from ACTOR_STORAGES_JSON', async () => { + sdk.config.set('actorStoragesJson', STORAGES_JSON); + process.env[APIFY_ENV_VARS.IS_AT_HOME] = '1'; + const mockOpenStorage = vitest.spyOn( + StorageManager.prototype, + 'openStorage', + ); + mockOpenStorage.mockResolvedValueOnce(vitest.fn()); + await sdk.openDataset({ alias: 'custom' }); + expect(mockOpenStorage).toBeCalledTimes(1); + expect(mockOpenStorage).toBeCalledWith( + 'dataset-id-123', + undefined, + ); + delete process.env[APIFY_ENV_VARS.IS_AT_HOME]; + }); + + test('openDataset with { alias } throws when alias not found in ACTOR_STORAGES_JSON', async () => { + sdk.config.set('actorStoragesJson', STORAGES_JSON); + process.env[APIFY_ENV_VARS.IS_AT_HOME] = '1'; + await expect( + sdk.openDataset({ alias: 'nonexistent' }), + ).rejects.toThrow( + /Storage alias "nonexistent" not found in ACTOR_STORAGES_JSON/, + ); + delete process.env[APIFY_ENV_VARS.IS_AT_HOME]; + }); + + test('openDataset with { alias } uses alias as name when ACTOR_STORAGES_JSON not set', async () => { + // No actorStoragesJson set — should use alias as a name for local storage + const mockOpenStorage = vitest.spyOn( + StorageManager.prototype, + 'openStorage', + ); + // First call is for the purge (drop), second for re-open + const mockStorage = { drop: vitest.fn() }; + mockOpenStorage.mockResolvedValueOnce(mockStorage); + mockOpenStorage.mockResolvedValueOnce(vitest.fn()); + await sdk.openDataset({ alias: 'my-local-ds' }); + // First open + drop, then re-open + expect(mockOpenStorage).toBeCalledTimes(2); + expect(mockOpenStorage).toBeCalledWith( + 'my-local-ds', + undefined, + ); + expect(mockStorage.drop).toBeCalledTimes(1); + }); + + test('openDataset with { alias } locally only purges once per alias', async () => { + const mockOpenStorage = vitest.spyOn( + StorageManager.prototype, + 'openStorage', + ); + const mockStorage = { drop: vitest.fn() }; + // First call: purge open, second: re-open after drop, third: second open (no purge) + mockOpenStorage.mockResolvedValue(mockStorage); + await sdk.openDataset({ alias: 'once-only' }); + expect(mockStorage.drop).toBeCalledTimes(1); + expect(mockOpenStorage).toBeCalledTimes(2); + + mockOpenStorage.mockClear(); + mockStorage.drop.mockClear(); + await sdk.openDataset({ alias: 'once-only' }); + // Second time: no purge, just one open call + expect(mockStorage.drop).not.toBeCalled(); + expect(mockOpenStorage).toBeCalledTimes(1); + }); + + test('openDataset with { id } passes ID directly', async () => { + const mockOpenStorage = vitest.spyOn( + StorageManager.prototype, + 'openStorage', + ); + mockOpenStorage.mockResolvedValueOnce(vitest.fn()); + await sdk.openDataset({ id: 'explicit-id' }); + expect(mockOpenStorage).toBeCalledTimes(1); + expect(mockOpenStorage).toBeCalledWith( + 'explicit-id', + undefined, + ); + }); + + test('openDataset with { name } passes name directly', async () => { + const mockOpenStorage = vitest.spyOn( + StorageManager.prototype, + 'openStorage', + ); + mockOpenStorage.mockResolvedValueOnce(vitest.fn()); + await sdk.openDataset({ name: 'explicit-name' }); + expect(mockOpenStorage).toBeCalledTimes(1); + expect(mockOpenStorage).toBeCalledWith( + 'explicit-name', + undefined, + ); + }); + + test('openDataset with plain string (backward compat) passes through', async () => { + const mockOpenStorage = vitest.spyOn( + StorageManager.prototype, + 'openStorage', + ); + mockOpenStorage.mockResolvedValueOnce(vitest.fn()); + await sdk.openDataset('my-dataset'); + expect(mockOpenStorage).toBeCalledTimes(1); + expect(mockOpenStorage).toBeCalledWith( + 'my-dataset', + undefined, + ); + }); + + test('openDataset with no argument opens default storage', async () => { + const mockOpenStorage = vitest.spyOn( + StorageManager.prototype, + 'openStorage', + ); + mockOpenStorage.mockResolvedValueOnce(vitest.fn()); + await sdk.openDataset(); + expect(mockOpenStorage).toBeCalledTimes(1); + expect(mockOpenStorage).toBeCalledWith( + undefined, + undefined, + ); + }); + + test('throws on malformed ACTOR_STORAGES_JSON', async () => { + sdk.config.set('actorStoragesJson', '{not valid json'); + process.env[APIFY_ENV_VARS.IS_AT_HOME] = '1'; + await expect( + sdk.openDataset({ alias: 'custom' }), + ).rejects.toThrow(/Failed to parse ACTOR_STORAGES_JSON/); + delete process.env[APIFY_ENV_VARS.IS_AT_HOME]; + }); + }); }); }); diff --git a/test/e2e/sdk/multiStorage/.actor/actor.json b/test/e2e/sdk/multiStorage/.actor/actor.json new file mode 100644 index 0000000000..88133cf484 --- /dev/null +++ b/test/e2e/sdk/multiStorage/.actor/actor.json @@ -0,0 +1,27 @@ +{ + "actorSpecification": 1, + "name": "apify-sdk-js-test-multi-storage", + "version": "0.0", + "storages": { + "datasets": { + "default": { + "actorSpecification": 1, + "fields": { + "properties": { + "url": { "type": "string" }, + "title": { "type": "string" } + } + } + }, + "results": { + "actorSpecification": 1, + "fields": { + "properties": { + "url": { "type": "string" }, + "title": { "type": "string" } + } + } + } + } + } +} diff --git a/test/e2e/sdk/multiStorage/src/main.mjs b/test/e2e/sdk/multiStorage/src/main.mjs new file mode 100644 index 0000000000..cb309096d4 --- /dev/null +++ b/test/e2e/sdk/multiStorage/src/main.mjs @@ -0,0 +1,19 @@ +import { Actor } from 'apify'; + +const actor = new Actor(); + +await actor.init(); + +// Open storages by alias — these should resolve via ACTOR_STORAGES_JSON +const resultsDataset = await actor.openDataset({ alias: 'results' }); + +// Write data to the aliased dataset +await resultsDataset.pushData([ + { url: 'https://example.com', title: 'Example' }, + { url: 'https://example.org', title: 'Example Org' }, +]); + +// Store the ID so the test script can find them +await Actor.setValue('ALIASED_DATASET_ID', resultsDataset.id); + +await actor.exit(); diff --git a/test/e2e/sdk/multiStorage/test.mjs b/test/e2e/sdk/multiStorage/test.mjs new file mode 100644 index 0000000000..99bae809cc --- /dev/null +++ b/test/e2e/sdk/multiStorage/test.mjs @@ -0,0 +1,56 @@ +import assert from 'node:assert/strict'; +import test from 'node:test'; + +import { ApifyClient, Dataset, KeyValueStore } from 'apify'; +import { sleep } from 'crawlee'; + +const client = new ApifyClient({ + token: process.env.APIFY_TOKEN, +}); + +const actor = client.actor(process.argv[2]); + +const runActor = async (input = {}, options = {}) => { + const { id: runId } = await actor.call(input, options); + await client.run(runId).waitForFinish(); + await sleep(6000); // wait for updates to propagate to MongoDB + return await client.run(runId).get(); +}; + +test('aliased storages are resolved and written to correctly', async () => { + const run = await runActor(); + + assert.strictEqual(run.status, 'SUCCEEDED'); + + // Read the aliased storage IDs that the actor stored in the default KVS + const defaultStore = await KeyValueStore.open(run.defaultKeyValueStoreId, { + storageClient: client, + }); + + const aliasedDatasetId = await defaultStore.getValue('ALIASED_DATASET_ID'); + + assert.ok(aliasedDatasetId, 'Aliased dataset ID must be present'); + + // The aliased storages should be different from the default ones + assert.notEqual( + aliasedDatasetId, + run.defaultDatasetId, + 'Aliased dataset should differ from the default dataset', + ); + + // Verify data in the aliased dataset + const aliasedDataset = await Dataset.open(aliasedDatasetId, { + storageClient: client, + }); + const datasetData = await aliasedDataset.getData(); + + assert.strictEqual( + datasetData.count, + 2, + 'Aliased dataset should have 2 items', + ); + assert.strictEqual(datasetData.items[0].url, 'https://example.com'); + assert.strictEqual(datasetData.items[0].title, 'Example'); + assert.strictEqual(datasetData.items[1].url, 'https://example.org'); + assert.strictEqual(datasetData.items[1].title, 'Example Org'); +}); diff --git a/test/e2e/sdk/multiStorageLocal/src/lifecycle.mjs b/test/e2e/sdk/multiStorageLocal/src/lifecycle.mjs new file mode 100644 index 0000000000..16de4aff44 --- /dev/null +++ b/test/e2e/sdk/multiStorageLocal/src/lifecycle.mjs @@ -0,0 +1,50 @@ +import { Actor, ApifyClient, Dataset } from 'apify'; + +const client = new ApifyClient({ + token: process.env.APIFY_TOKEN, +}); + +// Simulate local environment by removing platform env vars +delete process.env.APIFY_IS_AT_HOME; +delete process.env.ACTOR_STORAGES_JSON; + +const actor = new Actor({ + isAtHome: false, + logLevel: 'DEBUG', +}); + +await actor.init(); + +// Open storages by alias — locally, this should use the alias as the storage name +const resultsDataset = await actor.openDataset({ alias: 'results' }); + +// Write data to the aliased storages +await resultsDataset.pushData([ + { url: 'https://example.com', title: 'Example' }, + { url: 'https://example.org', title: 'Example Org' }, +]); + +// Verify purge-on-first-open: open the same alias again and write more data. +// The previously written data should still be there (no second purge). +const resultsDatasetAgain = await actor.openDataset({ alias: 'results' }); +await resultsDatasetAgain.pushData([ + { url: 'https://example.net', title: 'Example Net' }, +]); + +// Read back all data from the aliased dataset +const allData = await resultsDatasetAgain.getData(); + +// Transfer results to the platform's default dataset so the test script can verify +const run = await client.run(process.env.ACTOR_RUN_ID).get(); +const platformDataset = await Dataset.open(run.defaultDatasetId, { + storageClient: client, +}); + +await platformDataset.pushData([ + { + datasetItemCount: allData.count, + datasetItems: allData.items, + }, +]); + +await actor.exit(); diff --git a/test/e2e/sdk/multiStorageLocal/src/main.mjs b/test/e2e/sdk/multiStorageLocal/src/main.mjs new file mode 100644 index 0000000000..3ded56df85 --- /dev/null +++ b/test/e2e/sdk/multiStorageLocal/src/main.mjs @@ -0,0 +1,23 @@ +import { execFileSync } from 'node:child_process'; +import { dirname, join } from 'node:path'; +import { fileURLToPath } from 'node:url'; + +import { log } from '@apify/log'; + +const dir = dirname(fileURLToPath(import.meta.url)); +const lifecyclePath = join(dir, 'lifecycle.mjs'); + +// Run two separate "lifecycle" processes that share the same filesystem. +// The second process should purge the aliased dataset on first open, +// proving that purge-on-first-open works across Actor restarts. +for (const phase of ['first', 'second']) { + log.info(`--- Running ${phase} lifecycle ---`); + execFileSync('node', [lifecyclePath], { + stdio: 'inherit', + env: process.env, + }); +} + +// Both lifecycle processes pushed a summary to the platform default dataset. +// The test script will verify both summaries. +log.info('Both lifecycles completed successfully'); diff --git a/test/e2e/sdk/multiStorageLocal/test.mjs b/test/e2e/sdk/multiStorageLocal/test.mjs new file mode 100644 index 0000000000..435c45b870 --- /dev/null +++ b/test/e2e/sdk/multiStorageLocal/test.mjs @@ -0,0 +1,63 @@ +import assert from 'node:assert/strict'; +import test from 'node:test'; + +import { ApifyClient, Dataset } from 'apify'; +import { sleep } from 'crawlee'; + +const client = new ApifyClient({ + token: process.env.APIFY_TOKEN, +}); + +const actor = client.actor(process.argv[2]); + +const runActor = async (input = {}, options = {}) => { + const { id: runId } = await actor.call(input, options); + await client.run(runId).waitForFinish(); + await sleep(6000); // wait for updates to propagate to MongoDB + return await client.run(runId).get(); +}; + +test('aliased storages work locally with purge-on-first-open across restarts', async () => { + // The actor runs two lifecycle processes in sequence, sharing the same filesystem. + // Each lifecycle creates a fresh Actor instance (resetting purgedStorageAliases), + // opens the aliased dataset (triggering purge on first open), writes data, and + // pushes a summary to the platform default dataset. + const run = await runActor(); + + assert.strictEqual(run.status, 'SUCCEEDED'); + + const dataset = await Dataset.open(run.defaultDatasetId, { + storageClient: client, + }); + const data = await dataset.getData(); + + assert.strictEqual( + data.count, + 2, + 'There must be exactly two summary items (one per lifecycle)', + ); + + // First lifecycle: fresh local storage, writes 3 items total (2 + 1 from second open) + const firstSummary = data.items[0]; + assert.strictEqual( + firstSummary.datasetItemCount, + 3, + 'First lifecycle: aliased dataset should have 3 items (purge only happened on first open)', + ); + assert.strictEqual(firstSummary.datasetItems[0].url, 'https://example.com'); + assert.strictEqual(firstSummary.datasetItems[0].title, 'Example'); + assert.strictEqual(firstSummary.datasetItems[1].url, 'https://example.org'); + assert.strictEqual(firstSummary.datasetItems[1].title, 'Example Org'); + assert.strictEqual(firstSummary.datasetItems[2].url, 'https://example.net'); + assert.strictEqual(firstSummary.datasetItems[2].title, 'Example Net'); + + // Second lifecycle: the aliased dataset from the first lifecycle is still on disk. + // The second lifecycle's first openDataset({ alias: 'results' }) should purge it, + // then write 3 fresh items. If purge didn't work, there would be 6 items. + const secondSummary = data.items[1]; + assert.strictEqual( + secondSummary.datasetItemCount, + 3, + 'Second lifecycle: aliased dataset should have 3 items (stale data from first lifecycle was purged)', + ); +});