Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/aws-world-initial.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/world-aws': patch
---

Add AWS World implementation using DynamoDB for storage and SQS for message queuing
165 changes: 165 additions & 0 deletions packages/world-aws/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# @workflow/world-aws

An AWS World implementation for the [Workflow DevKit](https://useworkflow.dev), using **DynamoDB** for storage and **SQS** for message queuing.

## Architecture

This package uses AWS managed services to provide a production-ready, serverless-compatible World backend:

| Concern | AWS Service | Details |
|---------|-------------|---------|
| **Storage** | DynamoDB | All entities (runs, events, steps, hooks, waits, stream chunks) are stored in DynamoDB tables with on-demand billing |
| **Queue** | SQS | Standard queues with per-message delay (up to 15 min) for workflow and step invocations |
| **Streaming** | DynamoDB | Stream chunks stored in DynamoDB with polling-based real-time delivery |

### Why SQS over Postgres-backed queues?

- **Fully managed** - No connection pools, no worker processes to manage
- **Per-message delay** - Native support for delayed delivery (up to 15 minutes), useful for step retries
- **Elastic scaling** - Automatically scales with traffic, no need to provision workers
- **Dead letter queues** - Built-in support for failed message routing

## Installation

```bash
npm install @workflow/world-aws
# or
pnpm add @workflow/world-aws
```

## Configuration

### Environment Variables

| Variable | Description | Default |
|----------|-------------|---------|
| `AWS_REGION` | AWS region | `us-east-1` |
| `WORKFLOW_AWS_TABLE_PREFIX` | DynamoDB table name prefix | `workflow` |
| `WORKFLOW_AWS_SQS_WORKFLOW_QUEUE_URL` | SQS queue URL for workflow invocations | (required) |
| `WORKFLOW_AWS_SQS_STEP_QUEUE_URL` | SQS queue URL for step invocations | (required) |
| `WORKFLOW_AWS_QUEUE_CONCURRENCY` | Max concurrent message processing | `10` |
| `WORKFLOW_AWS_POLL_INTERVAL_MS` | SQS polling interval (ms) | `1000` |
| `WORKFLOW_AWS_DYNAMODB_ENDPOINT` | Custom DynamoDB endpoint (for local dev) | - |
| `WORKFLOW_AWS_SQS_ENDPOINT` | Custom SQS endpoint (for local dev) | - |

### Programmatic Configuration

```typescript
import { createWorld } from '@workflow/world-aws';

const world = createWorld({
region: 'us-west-2',
tablePrefix: 'myapp',
sqsWorkflowQueueUrl: 'https://sqs.us-west-2.amazonaws.com/123456789/myapp-workflows',
sqsStepQueueUrl: 'https://sqs.us-west-2.amazonaws.com/123456789/myapp-steps',
queueConcurrency: 20,
});
```

## Setup

### 1. Create DynamoDB Tables

Run the setup CLI to create all required DynamoDB tables:

```bash
npx workflow-aws-setup
```

This creates six tables with on-demand (PAY_PER_REQUEST) billing:
- `{prefix}_runs` - Workflow run entities
- `{prefix}_events` - Append-only event log
- `{prefix}_steps` - Step entities
- `{prefix}_hooks` - Webhook/notification hooks
- `{prefix}_waits` - Durable delay/sleep tracking
- `{prefix}_streams` - Streaming output chunks

### 2. Create SQS Queues

Create two standard SQS queues in your AWS account:

```bash
aws sqs create-queue --queue-name myapp-workflows
aws sqs create-queue --queue-name myapp-steps
```

Set the queue URLs in your environment or configuration.

### 3. IAM Permissions

The following IAM permissions are required:

```json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:UpdateItem",
"dynamodb:DeleteItem",
"dynamodb:Query",
"dynamodb:Scan",
"dynamodb:BatchWriteItem",
"dynamodb:DescribeTable",
"dynamodb:CreateTable"
],
"Resource": "arn:aws:dynamodb:*:*:table/workflow_*"
},
{
"Effect": "Allow",
"Action": [
"sqs:SendMessage",
"sqs:ReceiveMessage",
"sqs:DeleteMessage",
"sqs:GetQueueAttributes"
],
"Resource": "arn:aws:sqs:*:*:myapp-*"
}
]
}
```

## Local Development

For local development, use [LocalStack](https://localstack.cloud/) or [DynamoDB Local](https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/DynamoDBLocal.html):

```bash
# Start LocalStack
docker run -d -p 4566:4566 localstack/localstack

# Configure endpoints
export WORKFLOW_AWS_DYNAMODB_ENDPOINT=http://localhost:4566
export WORKFLOW_AWS_SQS_ENDPOINT=http://localhost:4566

# Create tables
npx workflow-aws-setup

# Create SQS queues
aws --endpoint-url http://localhost:4566 sqs create-queue --queue-name workflow-workflows
aws --endpoint-url http://localhost:4566 sqs create-queue --queue-name workflow-steps
```

## Usage with Next.js

```typescript
// workflow.config.ts
import { createWorld } from '@workflow/world-aws';

export const world = createWorld();
```

## DynamoDB Table Design

All tables use on-demand capacity (PAY_PER_REQUEST) and include Global Secondary Indexes for efficient querying:

- **Runs**: PK=`runId`, GSIs on `workflowName` and `status`
- **Events**: PK=`runId`, SK=`eventId`, GSI on `correlationId`
- **Steps**: PK=`stepId`, GSIs on `runId` and `status`
- **Hooks**: PK=`hookId`, GSIs on `runId` and `token`
- **Waits**: PK=`waitId`, GSI on `runId`
- **Streams**: PK=`streamId`, SK=`chunkId`, GSI on `runId`

Binary data (inputs, outputs, errors) is encoded using CBOR for efficient storage in DynamoDB's binary attribute type.
2 changes: 2 additions & 0 deletions packages/world-aws/bin/setup.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
#!/usr/bin/env node
import '../dist/cli.js';
68 changes: 68 additions & 0 deletions packages/world-aws/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
{
"name": "@workflow/world-aws",
"version": "4.1.0-beta.1",
"description": "An AWS World implementation using DynamoDB and SQS",
"type": "module",
"main": "dist/index.js",
"bin": {
"workflow-aws-setup": "./bin/setup.js"
},
"files": [
"dist",
"bin"
],
"publishConfig": {
"access": "public"
},
"license": "Apache-2.0",
"repository": {
"type": "git",
"url": "https://github.com/vercel/workflow.git",
"directory": "packages/world-aws"
},
"exports": {
".": {
"types": "./dist/index.d.ts",
"default": "./dist/index.js"
},
"./cli": {
"types": "./dist/cli.d.ts",
"default": "./dist/cli.js"
}
},
"scripts": {
"build": "tsc",
"dev": "tsc --watch",
"clean": "tsc --build --clean && rm -rf dist",
"test": "vitest run",
"typecheck": "tsc --noEmit"
},
"dependencies": {
"@aws-sdk/client-dynamodb": "3.750.0",
"@aws-sdk/client-sqs": "3.750.0",
"@aws-sdk/client-s3": "3.750.0",
"@aws-sdk/util-dynamodb": "3.750.0",
"@vercel/queue": "catalog:",
"@workflow/errors": "workspace:*",
"@workflow/world": "workspace:*",
"@workflow/world-local": "workspace:*",
"cbor-x": "1.6.0",
"ulid": "catalog:",
"zod": "catalog:"
},
"devDependencies": {
"@types/node": "catalog:",
"@workflow/tsconfig": "workspace:*",
"@workflow/world-testing": "workspace:*",
"vitest": "catalog:"
},
"keywords": [
"aws",
"dynamodb",
"sqs",
"workflow",
"durable-functions"
],
"author": "",
"packageManager": "pnpm@10.15.1"
}
40 changes: 40 additions & 0 deletions packages/world-aws/src/cli.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import { DynamoDBClient } from '@aws-sdk/client-dynamodb';
import { resolveConfig, tableNames } from './config.js';
import { ensureTables } from './dynamo.js';

async function main() {
console.log('[workflow-aws-setup] Setting up DynamoDB tables...');

const config = resolveConfig();
const tables = tableNames(config.tablePrefix);

console.log(`[workflow-aws-setup] Region: ${config.region}`);
console.log(`[workflow-aws-setup] Table prefix: ${config.tablePrefix}`);
if (config.dynamoDbEndpoint) {
console.log(
`[workflow-aws-setup] DynamoDB endpoint: ${config.dynamoDbEndpoint}`
);
}
console.log('[workflow-aws-setup] Tables to create:');
for (const [key, name] of Object.entries(tables)) {
console.log(` - ${key}: ${name}`);
}

const client = new DynamoDBClient({
region: config.region,
endpoint: config.dynamoDbEndpoint,
...config.dynamoDbConfig,
});

try {
await ensureTables(client, config.tablePrefix);
console.log('[workflow-aws-setup] All tables created successfully.');
} catch (err) {
console.error('[workflow-aws-setup] Failed to create tables:', err);
process.exit(1);
} finally {
client.destroy();
}
}

main();
99 changes: 99 additions & 0 deletions packages/world-aws/src/config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import type { DynamoDBClientConfig } from '@aws-sdk/client-dynamodb';
import type { SQSClientConfig } from '@aws-sdk/client-sqs';
import type { S3ClientConfig } from '@aws-sdk/client-s3';

export interface AwsWorldConfig {
/** AWS region (default: from AWS_REGION env var or 'us-east-1') */
region?: string;

/** Prefix for all DynamoDB table names (default: 'workflow') */
tablePrefix?: string;

/**
* SQS queue URLs. If not provided, they will be derived from the queue prefix
* and region. For FIFO queues, append '.fifo' to the URL.
*/
sqsWorkflowQueueUrl?: string;
sqsStepQueueUrl?: string;

/** Concurrency limit for SQS message polling (default: 10) */
queueConcurrency?: number;

/** Polling interval in milliseconds for SQS long-polling (default: 1000) */
pollIntervalMs?: number;

/** Optional custom DynamoDB client configuration */
dynamoDbConfig?: DynamoDBClientConfig;

/** Optional custom SQS client configuration */
sqsConfig?: SQSClientConfig;

/** Optional custom S3 client configuration (for stream chunks, optional) */
s3Config?: S3ClientConfig;

/**
* Optional DynamoDB endpoint override (useful for local development with
* DynamoDB Local or LocalStack)
*/
dynamoDbEndpoint?: string;

/** Optional SQS endpoint override */
sqsEndpoint?: string;
}

export function resolveConfig(
config?: Partial<AwsWorldConfig>
): Required<
Pick<
AwsWorldConfig,
'region' | 'tablePrefix' | 'queueConcurrency' | 'pollIntervalMs'
>
> &
AwsWorldConfig {
const region =
config?.region ||
process.env.AWS_REGION ||
process.env.AWS_DEFAULT_REGION ||
'us-east-1';

const tablePrefix =
config?.tablePrefix || process.env.WORKFLOW_AWS_TABLE_PREFIX || 'workflow';

const queueConcurrency =
config?.queueConcurrency ||
parseInt(process.env.WORKFLOW_AWS_QUEUE_CONCURRENCY || '10', 10) ||
10;

const pollIntervalMs =
config?.pollIntervalMs ||
parseInt(process.env.WORKFLOW_AWS_POLL_INTERVAL_MS || '1000', 10) ||
1000;

return {
...config,
region,
tablePrefix,
queueConcurrency,
pollIntervalMs,
sqsWorkflowQueueUrl:
config?.sqsWorkflowQueueUrl ||
process.env.WORKFLOW_AWS_SQS_WORKFLOW_QUEUE_URL,
sqsStepQueueUrl:
config?.sqsStepQueueUrl || process.env.WORKFLOW_AWS_SQS_STEP_QUEUE_URL,
dynamoDbEndpoint:
config?.dynamoDbEndpoint || process.env.WORKFLOW_AWS_DYNAMODB_ENDPOINT,
sqsEndpoint: config?.sqsEndpoint || process.env.WORKFLOW_AWS_SQS_ENDPOINT,
};
}

/** DynamoDB table names derived from a prefix. */
export function tableNames(prefix: string) {
return {
runs: `${prefix}_runs`,
events: `${prefix}_events`,
steps: `${prefix}_steps`,
hooks: `${prefix}_hooks`,
waits: `${prefix}_waits`,
streams: `${prefix}_streams`,
} as const;
}
Loading