Skip to content

Commit dc663f5

Browse files
Resolve local external payload envelopes
1 parent f90def1 commit dc663f5

8 files changed

Lines changed: 201 additions & 11 deletions

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -495,6 +495,13 @@ curl "$SERVER/api/workflows/order-42/runs/abc123/history" \
495495
- `PUT /api/namespaces/{name}` — Update namespace
496496
- `PUT /api/namespaces/{name}/external-storage` — Configure external payload storage policy
497497

498+
When a namespace enables the `local` external payload storage driver, the
499+
server resolves `{codec, external_storage}` payload envelopes on workflow
500+
start, signal, query, update, bridge-adapter, and activity result/failure
501+
ingress. S3, GCS, and Azure policies can be stored for control-plane parity,
502+
but server-side dereference remains fail-closed until those runtime drivers
503+
ship.
504+
498505
### Workflows
499506
- `GET /api/workflows` — List workflows (with filters)
500507
- `POST /api/workflows` — Start a workflow

app/Http/Controllers/Api/ActivityTaskController.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use App\Models\WorkerRegistration;
66
use App\Support\ActivityTaskPoller;
77
use App\Support\ExternalExecutorConfigContract;
8+
use App\Support\NamespaceExternalPayloadStorage;
89
use App\Support\NamespaceWorkflowScope;
910
use App\Support\WorkerProtocol;
1011
use Illuminate\Http\JsonResponse;
@@ -19,6 +20,7 @@ class ActivityTaskController
1920
{
2021
public function __construct(
2122
private readonly ActivityTaskPoller $activityTaskPoller,
23+
private readonly NamespaceExternalPayloadStorage $externalPayloadStorage,
2224
) {}
2325

2426
/**
@@ -125,6 +127,7 @@ public function complete(Request $request, string $taskId): JsonResponse
125127
$resolved = PayloadEnvelopeResolver::resolveCommandPayloadWithCodec(
126128
$validated['result'] ?? null,
127129
'result',
130+
$this->externalPayloadStorage->driverFor($namespace),
128131
);
129132
$outcome = $bridge->complete(
130133
$validated['activity_attempt_id'],
@@ -206,6 +209,7 @@ public function fail(Request $request, string $taskId): JsonResponse
206209
$resolved = PayloadEnvelopeResolver::resolveCommandPayloadWithCodec(
207210
$validated['failure']['details'] ?? null,
208211
'failure.details',
212+
$this->externalPayloadStorage->driverFor($namespace),
209213
);
210214
$failure = $validated['failure'];
211215

app/Http/Controllers/Api/BridgeAdapterController.php

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
use App\Support\BridgeAdapterOutcomeContract;
66
use App\Support\ControlPlaneProtocol;
7+
use App\Support\NamespaceExternalPayloadStorage;
78
use App\Support\NamespaceWorkflowScope;
89
use App\Support\WorkflowCommandContextFactory;
910
use App\Support\WorkflowStartService;
@@ -22,6 +23,7 @@ public function __construct(
2223
private readonly WorkflowStartService $workflowStartService,
2324
private readonly WorkflowControlPlane $workflowControlPlane,
2425
private readonly WorkflowCommandContextFactory $commandContexts,
26+
private readonly NamespaceExternalPayloadStorage $externalPayloadStorage,
2527
) {}
2628

2729
public function webhook(Request $request, string $adapter): JsonResponse
@@ -205,7 +207,8 @@ private function signalWorkflow(
205207
]);
206208
}
207209

208-
$envelope = PayloadEnvelopeResolver::resolve($validated['input'] ?? null, 'input');
210+
$externalStorage = $this->externalPayloadStorage->driverFor($namespace);
211+
$envelope = PayloadEnvelopeResolver::resolve($validated['input'] ?? null, 'input', $externalStorage);
209212

210213
$duplicate = $this->duplicateBridgeCommand(
211214
workflowId: $workflowId,
@@ -230,7 +233,7 @@ private function signalWorkflow(
230233
}
231234

232235
$result = $this->workflowControlPlane->signal($workflowId, $signalName, [
233-
'arguments' => PayloadEnvelopeResolver::resolveToArray($validated['input'] ?? null, 'input'),
236+
'arguments' => PayloadEnvelopeResolver::resolveToArray($validated['input'] ?? null, 'input', $externalStorage),
234237
'payload_codec' => $envelope['codec'],
235238
'payload_blob' => $envelope['blob'],
236239
'command_context' => $this->commandContexts->make(
@@ -315,7 +318,11 @@ private function updateWorkflow(
315318
}
316319

317320
$result = $this->workflowControlPlane->update($workflowId, $updateName, [
318-
'arguments' => PayloadEnvelopeResolver::resolveToArray($validated['input'] ?? null, 'input'),
321+
'arguments' => PayloadEnvelopeResolver::resolveToArray(
322+
$validated['input'] ?? null,
323+
'input',
324+
$this->externalPayloadStorage->driverFor($namespace),
325+
),
319326
'command_context' => $this->commandContexts->make(
320327
$request,
321328
workflowId: $workflowId,

app/Http/Controllers/Api/WorkflowController.php

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use App\Support\ControlPlaneProtocol;
66
use App\Support\ControlPlaneResponseContract;
77
use App\Support\ControlPlaneResultMapper;
8+
use App\Support\NamespaceExternalPayloadStorage;
89
use App\Support\NamespaceWorkflowScope;
910
use App\Support\WorkflowCommandContextFactory;
1011
use App\Support\WorkflowQueryTaskBroker;
@@ -30,6 +31,7 @@ public function __construct(
3031
private readonly ControlPlaneResultMapper $resultMapper,
3132
private readonly WorkflowQueryTaskBroker $queryTasks,
3233
private readonly WorkflowRunDiagnostics $diagnostics,
34+
private readonly NamespaceExternalPayloadStorage $externalPayloadStorage,
3335
) {}
3436

3537
public function index(Request $request): JsonResponse
@@ -383,13 +385,14 @@ public function signal(Request $request, string $workflowId, string $signalName)
383385
'request_id' => ['nullable', 'string', 'max:255'],
384386
]);
385387

386-
$envelope = PayloadEnvelopeResolver::resolve($validated['input'] ?? null, 'input');
388+
$externalStorage = $this->externalPayloadStorage->driverFor($namespace);
389+
$envelope = PayloadEnvelopeResolver::resolve($validated['input'] ?? null, 'input', $externalStorage);
387390

388391
$result = $this->workflowControlPlane->signal(
389392
$workflowId,
390393
$signalName,
391394
[
392-
'arguments' => PayloadEnvelopeResolver::resolveToArray($validated['input'] ?? null, 'input'),
395+
'arguments' => PayloadEnvelopeResolver::resolveToArray($validated['input'] ?? null, 'input', $externalStorage),
393396
'payload_codec' => $envelope['codec'],
394397
'payload_blob' => $envelope['blob'],
395398
'command_context' => $this->commandContexts->make(
@@ -435,7 +438,8 @@ public function query(Request $request, string $workflowId, string $queryName):
435438
]);
436439

437440
$run = NamespaceWorkflowScope::currentRun($namespace, $workflowId);
438-
$queryEnvelope = PayloadEnvelopeResolver::resolve($validated['input'] ?? null, 'input');
441+
$externalStorage = $this->externalPayloadStorage->driverFor($namespace);
442+
$queryEnvelope = PayloadEnvelopeResolver::resolve($validated['input'] ?? null, 'input', $externalStorage);
439443

440444
if ($run instanceof WorkflowRun && $this->queryTasks->hasWorkerFor($namespace, $run)) {
441445
return $this->resultMapper->query(
@@ -450,7 +454,7 @@ public function query(Request $request, string $workflowId, string $queryName):
450454
$workflowId,
451455
$queryName,
452456
[
453-
'arguments' => PayloadEnvelopeResolver::resolveToArray($validated['input'] ?? null, 'input'),
457+
'arguments' => PayloadEnvelopeResolver::resolveToArray($validated['input'] ?? null, 'input', $externalStorage),
454458
'command_context' => $this->commandContexts->make(
455459
$request,
456460
workflowId: $workflowId,
@@ -496,12 +500,13 @@ public function update(Request $request, string $workflowId, string $updateName)
496500
]);
497501

498502
$this->rejectLegacyUpdateFields($request);
503+
$externalStorage = $this->externalPayloadStorage->driverFor($namespace);
499504

500505
$result = $this->workflowControlPlane->update(
501506
$workflowId,
502507
$updateName,
503508
[
504-
'arguments' => PayloadEnvelopeResolver::resolveToArray($validated['input'] ?? null, 'input'),
509+
'arguments' => PayloadEnvelopeResolver::resolveToArray($validated['input'] ?? null, 'input', $externalStorage),
505510
'command_context' => $this->commandContexts->make(
506511
$request,
507512
workflowId: $workflowId,
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
<?php
2+
3+
namespace App\Support;
4+
5+
use App\Models\WorkflowNamespace;
6+
use Workflow\V2\Contracts\ExternalPayloadStorageDriver;
7+
use Workflow\V2\Support\LocalFilesystemExternalPayloadStorage;
8+
9+
class NamespaceExternalPayloadStorage
10+
{
11+
public function driverFor(?string $namespace): ?ExternalPayloadStorageDriver
12+
{
13+
$namespace = $namespace ?: (string) config('server.default_namespace', 'default');
14+
$ns = WorkflowNamespace::query()->where('name', $namespace)->first();
15+
$policy = is_array($ns?->external_payload_storage) ? $ns->external_payload_storage : [];
16+
17+
if ($policy === [] || ($policy['enabled'] ?? true) === false) {
18+
return null;
19+
}
20+
21+
$driver = $policy['driver'] ?? null;
22+
23+
if ($driver !== 'local') {
24+
return null;
25+
}
26+
27+
return new LocalFilesystemExternalPayloadStorage($this->localRoot($policy, $namespace));
28+
}
29+
30+
/**
31+
* @param array<string, mixed> $policy
32+
*/
33+
private function localRoot(array $policy, string $namespace): string
34+
{
35+
$uri = $policy['config']['uri'] ?? null;
36+
37+
if (is_string($uri) && str_starts_with($uri, 'file://')) {
38+
return rtrim(substr($uri, 7), '/');
39+
}
40+
41+
return storage_path('app/external-payloads/'.$namespace);
42+
}
43+
}

app/Support/WorkflowStartService.php

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ class WorkflowStartService
1313
public function __construct(
1414
private readonly WorkflowControlPlane $controlPlane,
1515
private readonly ConfiguredWorkflowTypeValidator $workflowTypes,
16+
private readonly NamespaceExternalPayloadStorage $externalPayloadStorage,
1617
) {}
1718

1819
/**
@@ -66,7 +67,11 @@ private function startRemoteWorkflow(
6667
?string $namespace = null,
6768
?CommandContext $commandContext = null,
6869
): array {
69-
$envelope = PayloadEnvelopeResolver::resolve($validated['input'] ?? null);
70+
$envelope = PayloadEnvelopeResolver::resolve(
71+
$validated['input'] ?? null,
72+
'input',
73+
$this->externalPayloadStorage->driverFor($namespace),
74+
);
7075

7176
// When the client sends no input (or an empty array), emit a
7277
// default-codec-encoded empty arg list so the run's `arguments`

composer.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)