Skip to content

Commit a70ceea

Browse files
Add configured object external payload storage
Add object external payload storage driver
1 parent 7a357a6 commit a70ceea

6 files changed

Lines changed: 252 additions & 29 deletions

File tree

app/Http/Controllers/Api/NamespaceController.php

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ public function updateExternalStorage(Request $request, string $namespace): Json
141141
'config' => ['nullable', 'array'],
142142
'config.uri' => ['nullable', 'string', 'max:2048'],
143143
'config.bucket' => ['nullable', 'string', 'max:255'],
144+
'config.disk' => ['nullable', 'string', 'max:255'],
144145
'config.prefix' => ['nullable', 'string', 'max:1024'],
145146
'config.region' => ['nullable', 'string', 'max:128'],
146147
'config.endpoint' => ['nullable', 'string', 'max:2048'],

app/Http/Controllers/Api/StorageController.php

Lines changed: 26 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,18 @@
44

55
use App\Models\WorkflowNamespace;
66
use App\Support\ControlPlaneProtocol;
7+
use App\Support\NamespaceExternalPayloadStorage;
78
use Illuminate\Http\JsonResponse;
89
use Illuminate\Http\Request;
9-
use Illuminate\Support\Facades\File;
10-
use Illuminate\Support\Str;
1110
use Illuminate\Validation\Rule;
11+
use Workflow\V2\Contracts\ExternalPayloadStorageDriver;
1212

1313
class StorageController
1414
{
15+
public function __construct(
16+
private readonly NamespaceExternalPayloadStorage $externalPayloadStorage,
17+
) {}
18+
1519
public function test(Request $request): JsonResponse
1620
{
1721
if ($response = ControlPlaneProtocol::rejectUnsupported($request)) {
@@ -37,24 +41,34 @@ public function test(Request $request): JsonResponse
3741
return $this->diagnosticError('external_storage_disabled', 'External payload storage is disabled for this namespace.', $namespace, $driver);
3842
}
3943

40-
if ($driver !== 'local') {
44+
if ($driver !== ($policy['driver'] ?? null)) {
4145
return $this->diagnosticError(
4246
'storage_driver_unavailable',
43-
'The server can persist this storage policy, but only the local diagnostic driver can run a round-trip test in this release.',
47+
'The requested external payload storage driver is not configured for this namespace.',
4448
$namespace,
4549
$driver,
4650
['supported_diagnostic_drivers' => ['local']],
4751
);
4852
}
4953

50-
$directory = $this->localDirectory($policy, $namespace);
54+
$storage = $this->externalPayloadStorage->driverFor($namespace);
55+
56+
if ($storage === null) {
57+
return $this->diagnosticError(
58+
'storage_driver_unavailable',
59+
'The server can persist this storage policy, but the configured storage driver is not available in this runtime.',
60+
$namespace,
61+
$driver,
62+
['supported_diagnostic_drivers' => ['local', 's3', 'gcs', 'azure']],
63+
);
64+
}
5165

5266
return ControlPlaneProtocol::json([
5367
'status' => 'passed',
5468
'namespace' => $namespace,
5569
'driver' => $driver,
56-
'small_payload' => $this->roundTrip($directory, 'small', (int) $validated['small_payload_bytes']),
57-
'large_payload' => $this->roundTrip($directory, 'large', (int) $validated['large_payload_bytes']),
70+
'small_payload' => $this->roundTrip($storage, 'small', (int) $validated['small_payload_bytes']),
71+
'large_payload' => $this->roundTrip($storage, 'large', (int) $validated['large_payload_bytes']),
5872
]);
5973
}
6074

@@ -74,27 +88,13 @@ private function diagnosticError(
7488
] + $extra, 422);
7589
}
7690

77-
private function localDirectory(array $policy, string $namespace): string
91+
private function roundTrip(ExternalPayloadStorageDriver $storage, string $kind, int $bytes): array
7892
{
79-
$uri = $policy['config']['uri'] ?? null;
80-
if (is_string($uri) && str_starts_with($uri, 'file://')) {
81-
return rtrim(substr($uri, 7), '/');
82-
}
83-
84-
return storage_path('app/external-payloads/'.$namespace);
85-
}
86-
87-
private function roundTrip(string $directory, string $kind, int $bytes): array
88-
{
89-
File::ensureDirectoryExists($directory);
90-
91-
$path = $directory.'/storage-test-'.$kind.'-'.(string) Str::uuid().'.bin';
9293
$payload = str_repeat($kind === 'small' ? 's' : 'l', $bytes);
9394
$expectedHash = hash('sha256', $payload);
94-
95-
file_put_contents($path, $payload);
96-
$read = file_get_contents($path);
97-
@unlink($path);
95+
$uri = $storage->put($payload, $expectedHash, 'storage-test-'.$kind);
96+
$read = $storage->get($uri);
97+
$storage->delete($uri);
9898

9999
if ($read !== $payload) {
100100
throw new \RuntimeException('External storage round trip failed integrity verification.');
@@ -104,7 +104,7 @@ private function roundTrip(string $directory, string $kind, int $bytes): array
104104
'status' => 'passed',
105105
'bytes' => $bytes,
106106
'sha256' => $expectedHash,
107-
'reference_uri' => 'file://'.$path,
107+
'reference_uri' => $uri,
108108
];
109109
}
110110
}
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
<?php
2+
3+
namespace App\Support;
4+
5+
use Illuminate\Support\Facades\Storage;
6+
use InvalidArgumentException;
7+
use RuntimeException;
8+
use Workflow\V2\Contracts\ExternalPayloadStorageDriver;
9+
10+
class FilesystemExternalPayloadStorage implements ExternalPayloadStorageDriver
11+
{
12+
public function __construct(
13+
private readonly string $disk,
14+
private readonly string $scheme,
15+
private readonly string $bucket,
16+
private readonly string $prefix = '',
17+
) {}
18+
19+
public function put(string $data, string $sha256, string $codec): string
20+
{
21+
$this->validateSha256($sha256);
22+
23+
$key = $this->keyFor($sha256, $codec);
24+
25+
if (Storage::disk($this->disk)->put($key, $data) === false) {
26+
throw new RuntimeException(sprintf('Unable to write external payload [%s].', $this->uriFor($key)));
27+
}
28+
29+
return $this->uriFor($key);
30+
}
31+
32+
public function get(string $uri): string
33+
{
34+
$key = $this->keyFromUri($uri);
35+
36+
if (! Storage::disk($this->disk)->exists($key)) {
37+
throw new RuntimeException(sprintf('Unable to read external payload [%s].', $uri));
38+
}
39+
40+
return Storage::disk($this->disk)->get($key);
41+
}
42+
43+
public function delete(string $uri): void
44+
{
45+
$key = $this->keyFromUri($uri);
46+
47+
if (Storage::disk($this->disk)->exists($key)) {
48+
Storage::disk($this->disk)->delete($key);
49+
}
50+
}
51+
52+
private function keyFor(string $sha256, string $codec): string
53+
{
54+
$codecSegment = $this->safeCodecSegment($codec);
55+
56+
return $this->prefix.$codecSegment.'/'.substr($sha256, 0, 2).'/'.$sha256;
57+
}
58+
59+
private function uriFor(string $key): string
60+
{
61+
return $this->scheme.'://'.$this->bucket.'/'.$key;
62+
}
63+
64+
private function keyFromUri(string $uri): string
65+
{
66+
$parts = parse_url($uri);
67+
68+
if (($parts['scheme'] ?? null) !== $this->scheme) {
69+
throw new InvalidArgumentException(sprintf(
70+
'External payload URI scheme must be [%s].',
71+
$this->scheme,
72+
));
73+
}
74+
75+
if (($parts['host'] ?? '') !== $this->bucket) {
76+
throw new InvalidArgumentException(sprintf(
77+
'External payload URI bucket must be [%s].',
78+
$this->bucket,
79+
));
80+
}
81+
82+
$key = ltrim(rawurldecode($parts['path'] ?? ''), '/');
83+
84+
if ($key === '' || str_contains($key, '..') || ! str_starts_with($key, $this->prefix)) {
85+
throw new InvalidArgumentException('External payload URI is outside the configured storage prefix.');
86+
}
87+
88+
return $key;
89+
}
90+
91+
private function validateSha256(string $sha256): void
92+
{
93+
if (! preg_match('/\A[a-f0-9]{64}\z/i', $sha256)) {
94+
throw new InvalidArgumentException('sha256 must be a hex digest.');
95+
}
96+
}
97+
98+
private function safeCodecSegment(string $codec): string
99+
{
100+
if (! preg_match('/\A[A-Za-z0-9_.-]+\z/', $codec)) {
101+
throw new InvalidArgumentException('Codec contains characters that are unsafe for external storage keys.');
102+
}
103+
104+
return $codec;
105+
}
106+
}

app/Support/NamespaceExternalPayloadStorage.php

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,27 @@ public function driverFor(?string $namespace): ?ExternalPayloadStorageDriver
2020

2121
$driver = $policy['driver'] ?? null;
2222

23-
if ($driver !== 'local') {
24-
return null;
23+
if ($driver === 'local') {
24+
return new LocalFilesystemExternalPayloadStorage($this->localRoot($policy, $namespace));
2525
}
2626

27-
return new LocalFilesystemExternalPayloadStorage($this->localRoot($policy, $namespace));
27+
if (in_array($driver, ['s3', 'gcs', 'azure'], true)) {
28+
$disk = $policy['config']['disk'] ?? null;
29+
$bucket = $policy['config']['bucket'] ?? null;
30+
31+
if (! is_string($disk) || $disk === '' || ! is_string($bucket) || $bucket === '') {
32+
return null;
33+
}
34+
35+
return new FilesystemExternalPayloadStorage(
36+
disk: $disk,
37+
scheme: $driver,
38+
bucket: $bucket,
39+
prefix: $this->prefix($policy),
40+
);
41+
}
42+
43+
return null;
2844
}
2945

3046
/**
@@ -40,4 +56,18 @@ private function localRoot(array $policy, string $namespace): string
4056

4157
return storage_path('app/external-payloads/'.$namespace);
4258
}
59+
60+
/**
61+
* @param array<string, mixed> $policy
62+
*/
63+
private function prefix(array $policy): string
64+
{
65+
$prefix = $policy['config']['prefix'] ?? '';
66+
67+
if (! is_string($prefix) || $prefix === '') {
68+
return '';
69+
}
70+
71+
return trim($prefix, '/').'/';
72+
}
4373
}

tests/Feature/ExternalPayloadStorageTest.php

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use App\Support\ControlPlaneProtocol;
77
use Illuminate\Foundation\Testing\RefreshDatabase;
88
use Illuminate\Support\Facades\File;
9+
use Illuminate\Support\Facades\Storage;
910
use Tests\TestCase;
1011

1112
class ExternalPayloadStorageTest extends TestCase
@@ -106,6 +107,41 @@ public function test_local_storage_diagnostic_round_trips_small_and_large_payloa
106107
$this->assertSame([], glob($this->storageDirectory.'/storage-test-*.bin') ?: []);
107108
}
108109

110+
public function test_configured_object_storage_diagnostic_round_trips_payloads(): void
111+
{
112+
Storage::fake('external-payload-objects');
113+
114+
$this->createNamespace('billing', [
115+
'driver' => 's3',
116+
'enabled' => true,
117+
'threshold_bytes' => 32,
118+
'config' => [
119+
'disk' => 'external-payload-objects',
120+
'bucket' => 'dw-payloads',
121+
'prefix' => 'diagnostics/',
122+
],
123+
]);
124+
125+
$response = $this->postJson('/api/storage/test', [
126+
'small_payload_bytes' => 16,
127+
'large_payload_bytes' => 64,
128+
], ['X-Namespace' => 'billing']);
129+
130+
$smallHash = hash('sha256', str_repeat('s', 16));
131+
132+
$response->assertOk()
133+
->assertJsonPath('status', 'passed')
134+
->assertJsonPath('namespace', 'billing')
135+
->assertJsonPath('driver', 's3')
136+
->assertJsonPath('small_payload.status', 'passed')
137+
->assertJsonPath('small_payload.bytes', 16)
138+
->assertJsonPath('small_payload.reference_uri', 's3://dw-payloads/diagnostics/storage-test-small/'.substr($smallHash, 0, 2).'/'.$smallHash)
139+
->assertJsonPath('large_payload.status', 'passed')
140+
->assertJsonPath('large_payload.bytes', 64);
141+
142+
$this->assertSame([], Storage::disk('external-payload-objects')->allFiles());
143+
}
144+
109145
public function test_storage_diagnostic_reports_unconfigured_and_unavailable_drivers(): void
110146
{
111147
$this->createNamespace('default');

tests/Feature/HistoryRetentionTest.php

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
use Illuminate\Support\Facades\Cache;
1111
use Illuminate\Support\Facades\File;
1212
use Illuminate\Support\Facades\Queue;
13+
use Illuminate\Support\Facades\Storage;
1314
use Tests\Feature\Concerns\ServerTestHelpers;
1415
use Tests\Fixtures\ExternalGreetingWorkflow;
1516
use Tests\TestCase;
@@ -332,6 +333,55 @@ public function test_retention_pass_blocks_external_payload_prune_when_driver_is
332333
$this->assertNotNull(WorkflowRunSummary::find($runId));
333334
}
334335

336+
public function test_retention_pass_deletes_configured_object_storage_references(): void
337+
{
338+
Queue::fake();
339+
Storage::fake('retention-object-payloads');
340+
341+
$this->createNamespace('default');
342+
WorkflowNamespace::where('name', 'default')->update([
343+
'external_payload_storage' => [
344+
'driver' => 's3',
345+
'enabled' => true,
346+
'config' => [
347+
'disk' => 'retention-object-payloads',
348+
'bucket' => 'dw-payloads',
349+
'prefix' => 'retention/',
350+
],
351+
],
352+
]);
353+
354+
$runId = $this->createExpiredClosedRun('default', 'wf-prune-external-s3-configured');
355+
$payload = 'large encoded payload bytes';
356+
$key = 'retention/avro/'.substr(hash('sha256', $payload), 0, 2).'/'.hash('sha256', $payload);
357+
358+
Storage::disk('retention-object-payloads')->put($key, $payload);
359+
360+
WorkflowHistoryEvent::query()->create([
361+
'workflow_run_id' => $runId,
362+
'sequence' => 999,
363+
'event_type' => HistoryEventType::ActivityCompleted->value,
364+
'payload' => [
365+
'result' => [
366+
'external_storage' => $this->externalStorageReference('s3://dw-payloads/'.$key, $payload),
367+
],
368+
],
369+
'recorded_at' => now(),
370+
]);
371+
372+
Storage::disk('retention-object-payloads')->assertExists($key);
373+
374+
$this->withHeaders($this->apiHeaders())
375+
->postJson('/api/system/retention/pass')
376+
->assertOk()
377+
->assertJsonPath('processed', 1)
378+
->assertJsonPath('pruned', 1)
379+
->assertJsonPath('results.0.external_payloads_deleted', 1);
380+
381+
Storage::disk('retention-object-payloads')->assertMissing($key);
382+
$this->assertNull(WorkflowRunSummary::find($runId));
383+
}
384+
335385
public function test_retention_pass_with_specific_run_ids(): void
336386
{
337387
$this->createNamespace('default');

0 commit comments

Comments
 (0)