Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/upgrading/upgrading_v4.md
Original file line number Diff line number Diff line change
Expand Up @@ -619,6 +619,10 @@ The high-level storage classes (`Dataset`, `KeyValueStore`, `RequestQueue`) now

`timeoutSecs` and `doNotRetryTimeouts` were removed from `RecordOptions` (used by `KeyValueStore.setValue`). Only `contentType` remains.

### `maybeStringify` is removed

The `maybeStringify` helper exported from `@crawlee/core` has been removed. Value (de)serialization now lives entirely in the `KeyValueStore` frontend: writing serializes the value (and infers its content type), reading parses it back, and the storage client is a plain byte transport. If you imported `maybeStringify` directly, use the `serializeValue` / `parseValue` functions exported from `@crawlee/core` instead.

### `KeyValueStoreIteratorOptions` simplified

`exclusiveStartKey` and `collection` were removed. Only `prefix` remains.
Expand Down
1 change: 1 addition & 0 deletions packages/core/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
"@crawlee/utils": "workspace:*",
"@sapphire/async-queue": "^1.5.5",
"@vladfrangu/async_event_emitter": "^2.4.6",
"content-type": "^1.0.5",
"csv-stringify": "^6.5.2",
"json5": "^2.2.3",
"minimatch": "^10.0.1",
Expand Down
1 change: 1 addition & 0 deletions packages/core/src/storages/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
export * from './dataset.js';
export * from './key_value_store.js';
export * from './key_value_store_codec.js';
export * from './request_list.js';
export type * from './request_loader.js';
export type * from './request_manager.js';
Expand Down
131 changes: 84 additions & 47 deletions packages/core/src/storages/key_value_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,53 +6,21 @@ import JSON5 from 'json5';
import ow, { ArgumentError } from 'ow';

import { KEY_VALUE_STORE_KEY_REGEX } from '@apify/consts';
import { jsonStringifyExtended } from '@apify/utilities';

import { Configuration } from '../configuration.js';
import { serviceLocator } from '../service_locator.js';
import type { Awaitable } from '../typedefs.js';
import { checkStorageAccess } from './access_checking.js';
import { parseValue, serializeValue } from './key_value_store_codec.js';
import type { StorageIdentifier } from './storage_instance_manager.js';
import type { StorageOpenOptions } from './utils.js';
import { resolveStorageIdentifier } from './storage_instance_manager.js';
import { createDualIterable, purgeDefaultStorages } from './utils.js';
import { isBuffer, isStream } from '@crawlee/utils';

/** @internal */
const KVS_KEYS_DEFAULT_LIMIT = 1000;

/**
* Helper function to possibly stringify value if options.contentType is not set.
*
* @ignore
*/
export const maybeStringify = <T>(value: T, options: { contentType?: string }) => {
// If contentType is missing, value will be stringified to JSON
if (options.contentType === null || options.contentType === undefined) {
options.contentType = 'application/json; charset=utf-8';

try {
// Format JSON to simplify debugging, the overheads with compression is negligible
value = jsonStringifyExtended(value as Dictionary, null, 2) as unknown as T;
} catch (e) {
const error = e as Error;
// Give more meaningful error message
if (error.message?.includes('Invalid string length')) {
error.message = 'Object is too large';
}
throw new Error(`The "value" parameter cannot be stringified to JSON: ${error.message}`);
}

if (value === undefined) {
throw new Error(
'The "value" parameter was stringified to JSON and returned undefined. ' +
"Make sure you're not trying to stringify an undefined value.",
);
}
}

return value;
};

/**
* The `KeyValueStore` class represents a key-value store, a simple data storage that is used
* for saving and reading data records or files. Each data record is
Expand Down Expand Up @@ -232,7 +200,53 @@ export class KeyValueStore {
ow(key, ow.string.nonEmpty);
const record = await this.client.getValue(key);

return (record?.value as T) ?? defaultValue ?? null;
// A missing record falls back to the default; a record that parses to a falsy value (including
// a stored literal `null`) is returned verbatim, so callers can tell "stored null" from "absent".
if (!record) {
return defaultValue ?? null;
}

// Storage clients are byte transports — the value is raw bytes; the frontend parses it here.
return parseValue(record.value, record.contentType ?? null) as T;
}

/**
* Reads a record from the key-value store without parsing the value.
*
* Use this when you need the raw bytes and the content type — for example, to run your own
* parser (`simdjson`, a custom XML library, etc.) or to forward the bytes verbatim.
*
* There is no symmetric `setRecord` method, because {@apilink KeyValueStore.setValue} already
* passes a `Buffer` (or `string` / `Stream`) through unchanged when an explicit `contentType`
* is provided. To write pre-serialized bytes, call
* `setValue(key, buffer, { contentType: 'application/json; charset=utf-8' })`.
*
* Returns `null` if the record does not exist.
*
* **Example usage:**
* ```javascript
* const store = await KeyValueStore.open();
* const record = await store.getRecord('huge.json');
* if (record) {
* const data = simdjson.parse(record.value);
* }
* ```
*
* @param key
* Unique key of the record. It can be at most 256 characters long and only consist
* of the following characters: `a`-`z`, `A`-`Z`, `0`-`9` and `!-_.'()`
*/
async getRecord(key: string): Promise<KeyValueStoreRawRecord | null> {
checkStorageAccess();

ow(key, ow.string.nonEmpty);
const record = await this.client.getValue(key);
if (!record) return null;

return {
value: record.value,
contentType: record.contentType ?? null,
};
}

/**
Expand Down Expand Up @@ -301,7 +315,10 @@ export class KeyValueStore {
const results: T[] = [];
for (const item of page) {
const record = await this.client.getValue(item.key);
if (record) results.push(mapRecord(item.key, record.value));
if (record) {
const parsed = parseValue(record.value, record.contentType ?? null);
results.push(mapRecord(item.key, parsed));
}
}
yield results;
}
Expand Down Expand Up @@ -375,15 +392,9 @@ export class KeyValueStore {
message: `The "key" argument "${key}" must be at most 256 characters long and only contain the following characters: a-zA-Z0-9!-_.'()`,
})),
);
if (
options.contentType &&
!(
ow.isValid(value, ow.any(ow.string, ow.uint8Array)) ||
(ow.isValid(value, ow.object) && typeof (value as Dictionary).pipe === 'function')
)
) {
if (options.contentType && !(typeof value === 'string' || isBuffer(value) || isStream(value))) {
throw new ArgumentError(
'The "value" parameter must be a String, Buffer or Stream when "options.contentType" is specified.',
'The "value" parameter must be a String, Buffer, ArrayBuffer, TypedArray, or Stream when "options.contentType" is specified.',
this.setValue,
);
}
Expand Down Expand Up @@ -417,12 +428,12 @@ export class KeyValueStore {
// In this case delete the record.
if (value === null) return this.client.deleteValue(key);

value = maybeStringify(value, optionsCopy);
const serialized = serializeValue(value, optionsCopy.contentType);

return this.client.setValue({
key,
value,
contentType: optionsCopy.contentType,
value: serialized.value,
contentType: serialized.contentType,
});
}

Expand Down Expand Up @@ -745,6 +756,23 @@ export class KeyValueStore {
return store.getValue<T>(key, defaultValue as T);
}

/**
* Reads a record from the default {@apilink KeyValueStore} associated with the current crawler run
* without parsing the value.
*
* This is just a convenient shortcut for {@apilink KeyValueStore.getRecord}. Returns `null` if the
* record does not exist.
*
* @param key
* Unique key of the record. It can be at most 256 characters long and only consist
* of the following characters: `a`-`z`, `A`-`Z`, `0`-`9` and `!-_.'()`
* @ignore
*/
static async getRecord(key: string): Promise<KeyValueStoreRawRecord | null> {
const store = await this.open();
return store.getRecord(key);
}

/**
* Tests whether a record with the given key exists in the default {@apilink KeyValueStore} associated with the current crawler run.
* @param key The queried record key.
Expand Down Expand Up @@ -865,6 +893,15 @@ export interface KeyValueStoreOptions {
client: KeyValueStoreClient;
}

/**
* A raw, unparsed key-value store record as returned by {@apilink KeyValueStore.getRecord}: the
* verbatim bytes plus the content type, with parsing left to the caller.
*/
export interface KeyValueStoreRawRecord {
value: Buffer | ArrayBuffer;
contentType: string | null;
}

export interface RecordOptions {
/**
* Specifies a custom MIME content type of the record.
Expand Down
138 changes: 138 additions & 0 deletions packages/core/src/storages/key_value_store_codec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
import type { Dictionary } from '@crawlee/types';
import contentTypeParser from 'content-type';
import { isBuffer, isStream } from '@crawlee/utils';
import JSON5 from 'json5';

import { jsonStringifyExtended } from '@apify/utilities';

const CONTENT_TYPE_JSON = 'application/json';
const STRINGIFIABLE_CONTENT_TYPE_RXS = [new RegExp(`^${CONTENT_TYPE_JSON}$`, 'i'), /^application\/.*xml$/i, /^text\//i];

/**
* Canonical write path for key-value store records.
*
* When a content type is provided, the value passes through unchanged — it is the caller's
* responsibility to supply a String/Buffer/Stream (the frontend validates this).
*
* When no content type is provided, it is inferred from the value's shape:
* - Buffer / typed array / ArrayBuffer / stream → `application/octet-stream` (passthrough)
* - `string` → `text/plain; charset=utf-8` (passthrough)
* - anything else → `application/json; charset=utf-8` (serialized via `jsonStringifyExtended`)
*
* Does NOT drain streams — that is storage mechanics and stays in the storage client.
*
* Backend-independent.
*/
export function serializeValue(
value: unknown,
contentType?: string,
): {
value: Buffer | ArrayBuffer | ArrayBufferView | string | NodeJS.ReadableStream | ReadableStream;
contentType: string;
} {
if (contentType !== null && contentType !== undefined) {
return { value: value as Buffer | string | NodeJS.ReadableStream | ReadableStream, contentType };
}

if (isStream(value) || isBuffer(value)) {
return {
value,
contentType: 'application/octet-stream',
};
}

if (typeof value === 'string') {
return { value, contentType: 'text/plain; charset=utf-8' };
}

let serialized: string;
try {
// Format JSON to simplify debugging, the overheads with compression is negligible
serialized = jsonStringifyExtended(value as Dictionary, null, 2);
Comment thread
janbuchar marked this conversation as resolved.
} catch (e) {
const error = e as Error;
// Give more meaningful error message
if (error.message?.includes('Invalid string length')) {
error.message = 'Object is too large';
}
throw new Error(`The "value" parameter cannot be stringified to JSON: ${error.message}`);
}

if (serialized === undefined) {
throw new Error(
'The "value" parameter was stringified to JSON and returned undefined. ' +
"Make sure you're not trying to stringify an undefined value.",
);
}

return { value: serialized, contentType: 'application/json; charset=utf-8' };
}

/**
* Parses a Buffer or ArrayBuffer using the provided content type header.
*
* - application/json is returned as a parsed object.
* - application/*xml and text/* are returned as strings.
* - everything else is returned as original body.
*
* If the header includes a charset, the body will be stringified only
* if the charset represents a known encoding to Node.js or Browser.
*
* Backend-independent — this is the canonical read path for the {@apilink KeyValueStore} frontend.
*/
export function parseValue(
body: Buffer | ArrayBuffer | string,
contentTypeHeader: string | null,
): string | Buffer | ArrayBuffer | Record<string, unknown> {
// No content type at all → we have no basis for interpretation; hand back the raw value.
if (contentTypeHeader === null) return body;

let contentType: string;
let charset: BufferEncoding;
try {
const result = contentTypeParser.parse(contentTypeHeader);
contentType = result.type;
charset = result.parameters.charset as BufferEncoding;
} catch {
// Unparseable header → keep the original value rather than a mangled string.
return body;
}

// If we can't successfully interpret it, we return the original value rather than mangling it.
if (!areDataStringifiable(contentType, charset)) return body;

// Decode raw bytes using the resolved charset. An already-decoded string passes through (callers
// may hand us one directly), avoiding a needless re-encode round-trip.
const dataString = typeof body === 'string' ? body : isomorphicBufferToString(body, charset);

return contentType === CONTENT_TYPE_JSON ? JSON5.parse(dataString) : dataString;
}

function isomorphicBufferToString(buffer: Buffer | ArrayBuffer, encoding: BufferEncoding): string {
Comment thread
janbuchar marked this conversation as resolved.
if (buffer.constructor.name !== ArrayBuffer.name) {
return (buffer as Buffer).toString(encoding);
}

// In Node, wrap the ArrayBuffer in a Buffer so the resolved charset is honored (the caller already
// checked it via `Buffer.isEncoding`). Only the browser, which lacks Buffer, is limited to UTF-8.
if (typeof Buffer !== 'undefined') {
return Buffer.from(buffer as ArrayBuffer).toString(encoding);
}

const decoder = new TextDecoder(encoding);
return decoder.decode(new Uint8Array(buffer as ArrayBuffer));
}

function isCharsetStringifiable(charset: string): charset is BufferEncoding {
if (!charset) return true; // hope that it's utf-8
return Buffer.isEncoding(charset);
}

function isContentTypeStringifiable(contentType: string): boolean {
if (!contentType) return false; // keep buffer
return STRINGIFIABLE_CONTENT_TYPE_RXS.some((rx) => rx.test(contentType));
}

function areDataStringifiable(contentType: string, charset: string): boolean {
return isContentTypeStringifiable(contentType) && isCharsetStringifiable(charset);
}
1 change: 1 addition & 0 deletions packages/fs-storage/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
},
"dependencies": {
"@crawlee/types": "workspace:*",
"@crawlee/utils": "workspace:*",
"@sapphire/async-queue": "^1.5.5",
"@sapphire/shapeshift": "^4.0.0",
"content-type": "^1.0.5",
Expand Down
6 changes: 3 additions & 3 deletions packages/fs-storage/src/fs/key-value-store/fs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,15 @@ export class KeyValueFileSystemEntry implements StorageImplementation<InternalKe

async get(): Promise<InternalKeyRecord> {
await this.fsQueue.wait();
let file: Buffer | string;
let file: Buffer;

try {
file = await readFile(this.filePath);
} catch {
try {
const noExtFilePath = resolve(this.storeDirectory, this.rawRecord.key);
// Try without extension
// Try without extension. Keep the raw bytes — interpretation (text vs. JSON) is the
// KeyValueStore frontend's job; this client is a plain byte transport.
file = await readFile(noExtFilePath);
this.logger?.warning(
[
Expand All @@ -47,7 +48,6 @@ export class KeyValueFileSystemEntry implements StorageImplementation<InternalKe
'If you want to have correct interpretation of the file, you should add a file extension to the entry.',
].join('\n'),
);
file = file.toString('utf-8');
this.filePath = noExtFilePath;
} catch {
// This is impossible to happen, but just in case
Expand Down
Loading
Loading