Skip to content

Commit 4e79343

Browse files
Run retention cleanup from worker heartbeats
1 parent 066adca commit 4e79343

7 files changed

Lines changed: 284 additions & 161 deletions

File tree

app/Http/Controllers/Api/SystemController.php

Lines changed: 6 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,15 @@
22

33
namespace App\Http\Controllers\Api;
44

5-
use App\Models\WorkflowNamespace;
65
use App\Support\ControlPlaneProtocol;
7-
use App\Support\NamespaceWorkflowScope;
6+
use App\Support\HistoryRetentionEnforcer;
87
use App\Support\ProjectionDriftMetrics;
98
use App\Support\WorkflowTaskFailureMetrics;
109
use Illuminate\Http\JsonResponse;
1110
use Illuminate\Http\Request;
12-
use Illuminate\Support\Facades\Log;
13-
use Workflow\V2\Enums\RunStatus;
14-
use Workflow\V2\Models\WorkflowRunSummary;
1511
use Workflow\V2\Support\ActivityTimeoutEnforcer;
1612
use Workflow\V2\Support\TaskRepairCandidates;
1713
use Workflow\V2\Support\TaskRepairPolicy;
18-
use Workflow\V2\Support\WorkflowRunRetentionCleanup;
1914
use Workflow\V2\TaskWatchdog;
2015

2116
class SystemController
@@ -208,19 +203,9 @@ public function retentionStatus(Request $request): JsonResponse
208203
]);
209204
$limit = min(100, (int) ($validated['limit'] ?? 100));
210205

211-
$ns = WorkflowNamespace::query()->where('name', $namespace)->first();
212-
$retentionDays = $ns?->retention_days ?? (int) config('server.history.retention_days', 30);
206+
$retentionDays = HistoryRetentionEnforcer::retentionDays($namespace);
213207
$cutoff = now()->subDays($retentionDays);
214-
215-
$expiredRunIds = NamespaceWorkflowScope::runSummaryQuery($namespace)
216-
->whereIn('workflow_run_summaries.status_bucket', ['completed', 'failed'])
217-
->whereNotNull('workflow_run_summaries.closed_at')
218-
->whereNull('workflow_run_summaries.archived_at')
219-
->where('workflow_run_summaries.closed_at', '<', $cutoff)
220-
->orderBy('workflow_run_summaries.closed_at')
221-
->limit($limit)
222-
->pluck('workflow_run_summaries.id')
223-
->all();
208+
$expiredRunIds = HistoryRetentionEnforcer::expiredRunIds($namespace, $limit);
224209

225210
return ControlPlaneProtocol::json([
226211
'namespace' => $namespace,
@@ -252,135 +237,10 @@ public function retentionEnforcePass(Request $request): JsonResponse
252237
$validated['run_ids'] ?? [],
253238
));
254239

255-
if ($runIds === []) {
256-
$ns = WorkflowNamespace::query()->where('name', $namespace)->first();
257-
$retentionDays = $ns?->retention_days ?? (int) config('server.history.retention_days', 30);
258-
$cutoff = now()->subDays($retentionDays);
259-
260-
$runIds = NamespaceWorkflowScope::runSummaryQuery($namespace)
261-
->whereIn('workflow_run_summaries.status_bucket', ['completed', 'failed'])
262-
->whereNotNull('workflow_run_summaries.closed_at')
263-
->whereNull('workflow_run_summaries.archived_at')
264-
->where('workflow_run_summaries.closed_at', '<', $cutoff)
265-
->orderBy('workflow_run_summaries.closed_at')
266-
->limit($limit)
267-
->pluck('workflow_run_summaries.id')
268-
->all();
269-
}
270-
271-
if ($runIds === []) {
272-
return ControlPlaneProtocol::json([
273-
'processed' => 0,
274-
'pruned' => 0,
275-
'skipped' => 0,
276-
'failed' => 0,
277-
'results' => [],
278-
]);
279-
}
280-
281-
$results = [];
282-
$pruned = 0;
283-
$skipped = 0;
284-
$failed = 0;
285-
286-
foreach ($runIds as $runId) {
287-
try {
288-
$result = $this->pruneRun($namespace, $runId);
289-
290-
if ($result['pruned']) {
291-
$pruned++;
292-
$results[] = [
293-
'run_id' => $runId,
294-
'outcome' => 'pruned',
295-
'history_events_deleted' => $result['history_events_deleted'],
296-
'tasks_deleted' => $result['tasks_deleted'],
297-
'deleted' => $result['deleted'],
298-
];
299-
} else {
300-
$skipped++;
301-
$results[] = [
302-
'run_id' => $runId,
303-
'outcome' => 'skipped',
304-
'reason' => $result['reason'],
305-
];
306-
}
307-
} catch (\Throwable $e) {
308-
$failed++;
309-
$results[] = [
310-
'run_id' => $runId,
311-
'outcome' => 'error',
312-
'reason' => $e->getMessage(),
313-
];
314-
}
315-
}
316-
317-
$hasFailures = $failed > 0;
318-
319-
return ControlPlaneProtocol::json([
320-
'processed' => count($runIds),
321-
'pruned' => $pruned,
322-
'skipped' => $skipped,
323-
'failed' => $failed,
324-
'results' => $results,
325-
], $hasFailures ? 207 : 200);
326-
}
327-
328-
/**
329-
* @return array{pruned: bool, reason: string|null, history_events_deleted: int, tasks_deleted: int, deleted: array<string, int>}
330-
*/
331-
private function pruneRun(string $namespace, string $runId): array
332-
{
333-
$summary = WorkflowRunSummary::query()
334-
->where('id', $runId)
335-
->where('namespace', $namespace)
336-
->first();
337-
338-
if (! $summary) {
339-
return $this->skippedRetentionResult('run_not_found');
340-
}
240+
$report = HistoryRetentionEnforcer::runPass($namespace, $limit, $runIds);
341241

342-
$status = is_string($summary->status) ? RunStatus::tryFrom($summary->status) : null;
242+
$hasFailures = $report['failed'] > 0;
343243

344-
if ($status === null || ! $status->isTerminal()) {
345-
return $this->skippedRetentionResult('run_not_terminal');
346-
}
347-
348-
if ($summary->archived_at !== null) {
349-
return $this->skippedRetentionResult('run_archived');
350-
}
351-
352-
// Audit log before deletion
353-
Log::info('retention_prune_run', [
354-
'namespace' => $namespace,
355-
'run_id' => $runId,
356-
'workflow_instance_id' => $summary->workflow_instance_id,
357-
'workflow_type' => $summary->workflow_type,
358-
'status' => $summary->status,
359-
'closed_at' => $summary->closed_at?->toIso8601String(),
360-
]);
361-
362-
$report = WorkflowRunRetentionCleanup::pruneRun($runId);
363-
364-
return [
365-
'pruned' => true,
366-
'reason' => null,
367-
'history_events_deleted' => $report['history_events_deleted'] ?? 0,
368-
'tasks_deleted' => $report['tasks_deleted'] ?? 0,
369-
'deleted' => $report,
370-
];
371-
}
372-
373-
/**
374-
* @return array{pruned: false, reason: string, history_events_deleted: 0, tasks_deleted: 0, deleted: array<string, int>}
375-
*/
376-
private function skippedRetentionResult(string $reason): array
377-
{
378-
return [
379-
'pruned' => false,
380-
'reason' => $reason,
381-
'history_events_deleted' => 0,
382-
'tasks_deleted' => 0,
383-
'deleted' => [],
384-
];
244+
return ControlPlaneProtocol::json($report, $hasFailures ? 207 : 200);
385245
}
386246
}

app/Http/Controllers/Api/WorkerController.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace App\Http\Controllers\Api;
44

55
use App\Models\WorkerRegistration;
6+
use App\Support\HistoryRetentionEnforcer;
67
use App\Support\NamespaceWorkflowScope;
78
use App\Support\QueryTaskQueueUnavailableException;
89
use App\Support\WorkerProtocol;
@@ -250,9 +251,12 @@ public function heartbeat(Request $request): JsonResponse
250251
buildId: is_string($worker->build_id) ? $worker->build_id : null,
251252
);
252253

254+
$retention = HistoryRetentionEnforcer::runInlinePass($namespace);
255+
253256
return WorkerProtocol::json([
254257
'worker_id' => $worker->worker_id,
255258
'acknowledged' => true,
259+
'retention' => $retention,
256260
]);
257261
}
258262

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
<?php
2+
3+
namespace App\Support;
4+
5+
use App\Models\WorkflowNamespace;
6+
use Illuminate\Support\Facades\Cache;
7+
use Illuminate\Support\Facades\Log;
8+
use Workflow\V2\Enums\RunStatus;
9+
use Workflow\V2\Models\WorkflowRunSummary;
10+
use Workflow\V2\Support\WorkflowRunRetentionCleanup;
11+
12+
class HistoryRetentionEnforcer
13+
{
14+
private const INLINE_CACHE_PREFIX = 'server:history-retention-inline:';
15+
16+
private const INLINE_THROTTLE_SECONDS = 60;
17+
18+
private const INLINE_LIMIT = 1;
19+
20+
/**
21+
* @return list<string>
22+
*/
23+
public static function expiredRunIds(string $namespace, int $limit): array
24+
{
25+
$limit = max(1, min(100, $limit));
26+
$retentionDays = self::retentionDays($namespace);
27+
$cutoff = now()->subDays($retentionDays);
28+
29+
return NamespaceWorkflowScope::runSummaryQuery($namespace)
30+
->whereIn('workflow_run_summaries.status_bucket', ['completed', 'failed'])
31+
->whereNotNull('workflow_run_summaries.closed_at')
32+
->whereNull('workflow_run_summaries.archived_at')
33+
->where('workflow_run_summaries.closed_at', '<', $cutoff)
34+
->orderBy('workflow_run_summaries.closed_at')
35+
->limit($limit)
36+
->pluck('workflow_run_summaries.id')
37+
->all();
38+
}
39+
40+
public static function retentionDays(string $namespace): int
41+
{
42+
$ns = WorkflowNamespace::query()->where('name', $namespace)->first();
43+
44+
return $ns?->retention_days ?? (int) config('server.history.retention_days', 30);
45+
}
46+
47+
/**
48+
* @param list<string> $runIds
49+
* @return array{processed: int, pruned: int, skipped: int, failed: int, results: list<array<string, mixed>>}
50+
*/
51+
public static function runPass(string $namespace, int $limit = 100, array $runIds = []): array
52+
{
53+
$runIds = $runIds === []
54+
? self::expiredRunIds($namespace, $limit)
55+
: array_values($runIds);
56+
57+
$results = [];
58+
$pruned = 0;
59+
$skipped = 0;
60+
$failed = 0;
61+
62+
foreach ($runIds as $runId) {
63+
try {
64+
$result = self::pruneRun($namespace, $runId);
65+
66+
if ($result['pruned']) {
67+
$pruned++;
68+
$results[] = [
69+
'run_id' => $runId,
70+
'outcome' => 'pruned',
71+
'history_events_deleted' => $result['history_events_deleted'],
72+
'tasks_deleted' => $result['tasks_deleted'],
73+
'deleted' => $result['deleted'],
74+
];
75+
} else {
76+
$skipped++;
77+
$results[] = [
78+
'run_id' => $runId,
79+
'outcome' => 'skipped',
80+
'reason' => $result['reason'],
81+
];
82+
}
83+
} catch (\Throwable $e) {
84+
$failed++;
85+
$results[] = [
86+
'run_id' => $runId,
87+
'outcome' => 'error',
88+
'reason' => $e->getMessage(),
89+
];
90+
}
91+
}
92+
93+
return [
94+
'processed' => count($runIds),
95+
'pruned' => $pruned,
96+
'skipped' => $skipped,
97+
'failed' => $failed,
98+
'results' => $results,
99+
];
100+
}
101+
102+
/**
103+
* Run a tiny retention pass from ordinary worker traffic.
104+
*
105+
* This is intentionally bounded and throttled. The explicit API/console
106+
* pass remains available for operators, but active deployments no longer
107+
* depend on a separate scheduler for all retention progress.
108+
*
109+
* @return array{throttled: bool, processed: int, pruned: int, skipped: int, failed: int}
110+
*/
111+
public static function runInlinePass(string $namespace): array
112+
{
113+
$key = self::INLINE_CACHE_PREFIX.sha1($namespace);
114+
115+
if (! Cache::add($key, '1', now()->addSeconds(self::INLINE_THROTTLE_SECONDS))) {
116+
return [
117+
'throttled' => true,
118+
'processed' => 0,
119+
'pruned' => 0,
120+
'skipped' => 0,
121+
'failed' => 0,
122+
];
123+
}
124+
125+
$report = self::runPass($namespace, self::INLINE_LIMIT);
126+
127+
return [
128+
'throttled' => false,
129+
'processed' => $report['processed'],
130+
'pruned' => $report['pruned'],
131+
'skipped' => $report['skipped'],
132+
'failed' => $report['failed'],
133+
];
134+
}
135+
136+
/**
137+
* @return array{pruned: bool, reason: string|null, history_events_deleted: int, tasks_deleted: int, deleted: array<string, int>}
138+
*/
139+
public static function pruneRun(string $namespace, string $runId): array
140+
{
141+
$summary = WorkflowRunSummary::query()
142+
->where('id', $runId)
143+
->where('namespace', $namespace)
144+
->first();
145+
146+
if (! $summary) {
147+
return self::skippedRetentionResult('run_not_found');
148+
}
149+
150+
$status = is_string($summary->status) ? RunStatus::tryFrom($summary->status) : null;
151+
152+
if ($status === null || ! $status->isTerminal()) {
153+
return self::skippedRetentionResult('run_not_terminal');
154+
}
155+
156+
if ($summary->archived_at !== null) {
157+
return self::skippedRetentionResult('run_archived');
158+
}
159+
160+
Log::info('retention_prune_run', [
161+
'namespace' => $namespace,
162+
'run_id' => $runId,
163+
'workflow_instance_id' => $summary->workflow_instance_id,
164+
'workflow_type' => $summary->workflow_type,
165+
'status' => $summary->status,
166+
'closed_at' => $summary->closed_at?->toIso8601String(),
167+
]);
168+
169+
$report = WorkflowRunRetentionCleanup::pruneRun($runId);
170+
171+
return [
172+
'pruned' => true,
173+
'reason' => null,
174+
'history_events_deleted' => $report['history_events_deleted'] ?? 0,
175+
'tasks_deleted' => $report['tasks_deleted'] ?? 0,
176+
'deleted' => $report,
177+
];
178+
}
179+
180+
/**
181+
* @return array{pruned: false, reason: string, history_events_deleted: 0, tasks_deleted: 0, deleted: array<string, int>}
182+
*/
183+
private static function skippedRetentionResult(string $reason): array
184+
{
185+
return [
186+
'pruned' => false,
187+
'reason' => $reason,
188+
'history_events_deleted' => 0,
189+
'tasks_deleted' => 0,
190+
'deleted' => [],
191+
];
192+
}
193+
}

0 commit comments

Comments
 (0)