Skip to content

definenoob/WOLF

Repository files navigation

🐺 WOLF: World of Line Feed

A high-performance Cloudflare Worker for distributed map-reduce processing of newline-delimited JSON (NDJSON) data stored in S3-compatible object storage.

Overview

WOLF executes user-defined map and reduce functions against large data shards in a sandboxed QuickJS runtime. It streams data directly from S3, processes it in optimized batches, and returns aggregated results—all within the constraints of a serverless edge function.

┌─────────────┐      ┌─────────────┐      ┌─────────────┐
│   S3 Data   │ ───▶ │    WOLF     │ ───▶ │   Result    │
│   Shard     │      │  (QuickJS)  │      │   (JSON)    │
└─────────────┘      └─────────────┘      └─────────────┘
                           │
                     map() │ reduce()
                           ▼
                    User-defined JS

Features

  • Sandboxed Execution — User code runs in QuickJS WASM with memory limits (105MB)
  • Streaming I/O — Processes data as it arrives; no full shard buffering
  • Batch Optimization — Groups 2,000 lines per QuickJS call to minimize FFI overhead
  • Gzip Support — Transparent decompression for .gz suffixed keys
  • Auxiliary Data — Inject lookup tables via db_reduce / db_merge parameters
  • Telemetry — Returns CPU time, S3 latency, line counts, and error metrics

API

Endpoint

POST /
Content-Type: application/json

Request Body

Field Type Required Description
bucket string S3 bucket name
key string Object key (path) to the NDJSON shard
map string JavaScript function body for mapping each entry
reduce string JavaScript function body for reducing mapped values
db_reduce object | DbReference Auxiliary data available as globalThis.dbr
db_merge object | DbReference Auxiliary data available as globalThis.dbm
aws_access_key_id string Override env credentials
aws_secret_access_key string Override env credentials
aws_region string Default: auto
aws_endpoint string Custom S3 endpoint (e.g., R2, MinIO)

DbReference

Auxiliary data can be provided inline or as an S3 reference:

{
  "bucket": "my-bucket",
  "key": "lookups/categories.json"
}

Response

Streamed JSON with the following structure:

{
  "result": <aggregated_output>,
  "processedLines": 150000,
  "skippedLines": 0,
  "mapErrorCount": 12,
  "reduceErrorCount": 0,
  "s3TimeMs": "842.30",
  "cpuTimeMs": "1523.45",
  "key": "shards/2024/shard-0042.ndjson.gz"
}

Writing Map/Reduce Functions

Map Function

Receives each parsed JSON line as entry. Return a value to pass to reduce, or null/undefined to skip.

// Count orders by status
return { [entry.status]: 1 };
// Filter and extract
if (entry.country !== 'US') return null;
return { revenue: entry.amount, count: 1 };

Reduce Function

Combines two values (a = accumulator, b = mapped value). First call has a = null.

// Sum counters
if (a === null) return b;
return {
  revenue: (a.revenue || 0) + (b.revenue || 0),
  count: (a.count || 0) + (b.count || 0)
};
// Merge frequency maps
if (a === null) return b;
for (var k in b) a[k] = (a[k] || 0) + b[k];
return a;

Using Auxiliary Data

Access injected data via globalThis.dbr and globalThis.dbm:

// Map: Enrich with category lookup
var category = globalThis.dbr[entry.product_id] || 'unknown';
return { [category]: entry.quantity };

Examples

Word Frequency Count

{
  "bucket": "data-lake",
  "key": "logs/2024-01-15.ndjson.gz",
  "map": "if (!entry.message) return null; var words = entry.message.toLowerCase().split(/\\W+/); var freq = {}; for (var i = 0; i < words.length; i++) { if (words[i]) freq[words[i]] = 1; } return freq;",
  "reduce": "if (a === null) return b; for (var k in b) a[k] = (a[k] || 0) + b[k]; return a;"
}

Revenue by Region with Lookup

{
  "bucket": "sales-data",
  "key": "transactions/q4.ndjson",
  "map": "var region = globalThis.dbr[entry.store_id]; if (!region) return null; return { [region]: entry.total };",
  "reduce": "if (a === null) return b; for (var k in b) a[k] = (a[k] || 0) + b[k]; return a;",
  "db_reduce": {
    "bucket": "config",
    "key": "store-regions.json"
  }
}

Environment Variables

Variable Description
AWS_ACCESS_KEY_ID Default S3 access key
AWS_SECRET_ACCESS_KEY Default S3 secret key
AWS_REGION Default region (fallback: auto)
AWS_ENDPOINT_URL Custom S3 endpoint

Error Handling

WOLF is resilient to malformed data:

  • Parse errors in individual JSON lines increment skippedLines
  • Map errors increment mapErrorCount and skip the entry
  • Reduce errors increment reduceErrorCount and skip the reduction

Fatal errors (syntax errors in user code, S3 failures) return HTTP 500 with details.

Performance Notes

  • Batch size: 2,000 lines reduces QuickJS FFI overhead
  • Memory limit: 105MB for the QuickJS heap
  • Streaming: Results are streamed back as they're generated
  • Garbage collection: Pending jobs executed every 50 batches

Deployment

# Install dependencies
npm install

# Deploy to Cloudflare
wrangler deploy

Ensure quickjs.wasm is available as a module import.

License

MIT

About

World of Line Feed - Screaming-Fast Map/Reduce at the Edge

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors