Skip to content

Commit ad70216

Browse files
Revert "Run retention cleanup from worker heartbeats"
This reverts commit 4e79343.
1 parent 4e79343 commit ad70216

7 files changed

Lines changed: 161 additions & 284 deletions

File tree

app/Http/Controllers/Api/SystemController.php

Lines changed: 146 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,15 +2,20 @@
22

33
namespace App\Http\Controllers\Api;
44

5+
use App\Models\WorkflowNamespace;
56
use App\Support\ControlPlaneProtocol;
6-
use App\Support\HistoryRetentionEnforcer;
7+
use App\Support\NamespaceWorkflowScope;
78
use App\Support\ProjectionDriftMetrics;
89
use App\Support\WorkflowTaskFailureMetrics;
910
use Illuminate\Http\JsonResponse;
1011
use Illuminate\Http\Request;
12+
use Illuminate\Support\Facades\Log;
13+
use Workflow\V2\Enums\RunStatus;
14+
use Workflow\V2\Models\WorkflowRunSummary;
1115
use Workflow\V2\Support\ActivityTimeoutEnforcer;
1216
use Workflow\V2\Support\TaskRepairCandidates;
1317
use Workflow\V2\Support\TaskRepairPolicy;
18+
use Workflow\V2\Support\WorkflowRunRetentionCleanup;
1419
use Workflow\V2\TaskWatchdog;
1520

1621
class SystemController
@@ -203,9 +208,19 @@ public function retentionStatus(Request $request): JsonResponse
203208
]);
204209
$limit = min(100, (int) ($validated['limit'] ?? 100));
205210

206-
$retentionDays = HistoryRetentionEnforcer::retentionDays($namespace);
211+
$ns = WorkflowNamespace::query()->where('name', $namespace)->first();
212+
$retentionDays = $ns?->retention_days ?? (int) config('server.history.retention_days', 30);
207213
$cutoff = now()->subDays($retentionDays);
208-
$expiredRunIds = HistoryRetentionEnforcer::expiredRunIds($namespace, $limit);
214+
215+
$expiredRunIds = NamespaceWorkflowScope::runSummaryQuery($namespace)
216+
->whereIn('workflow_run_summaries.status_bucket', ['completed', 'failed'])
217+
->whereNotNull('workflow_run_summaries.closed_at')
218+
->whereNull('workflow_run_summaries.archived_at')
219+
->where('workflow_run_summaries.closed_at', '<', $cutoff)
220+
->orderBy('workflow_run_summaries.closed_at')
221+
->limit($limit)
222+
->pluck('workflow_run_summaries.id')
223+
->all();
209224

210225
return ControlPlaneProtocol::json([
211226
'namespace' => $namespace,
@@ -237,10 +252,135 @@ public function retentionEnforcePass(Request $request): JsonResponse
237252
$validated['run_ids'] ?? [],
238253
));
239254

240-
$report = HistoryRetentionEnforcer::runPass($namespace, $limit, $runIds);
255+
if ($runIds === []) {
256+
$ns = WorkflowNamespace::query()->where('name', $namespace)->first();
257+
$retentionDays = $ns?->retention_days ?? (int) config('server.history.retention_days', 30);
258+
$cutoff = now()->subDays($retentionDays);
259+
260+
$runIds = NamespaceWorkflowScope::runSummaryQuery($namespace)
261+
->whereIn('workflow_run_summaries.status_bucket', ['completed', 'failed'])
262+
->whereNotNull('workflow_run_summaries.closed_at')
263+
->whereNull('workflow_run_summaries.archived_at')
264+
->where('workflow_run_summaries.closed_at', '<', $cutoff)
265+
->orderBy('workflow_run_summaries.closed_at')
266+
->limit($limit)
267+
->pluck('workflow_run_summaries.id')
268+
->all();
269+
}
241270

242-
$hasFailures = $report['failed'] > 0;
271+
if ($runIds === []) {
272+
return ControlPlaneProtocol::json([
273+
'processed' => 0,
274+
'pruned' => 0,
275+
'skipped' => 0,
276+
'failed' => 0,
277+
'results' => [],
278+
]);
279+
}
243280

244-
return ControlPlaneProtocol::json($report, $hasFailures ? 207 : 200);
281+
$results = [];
282+
$pruned = 0;
283+
$skipped = 0;
284+
$failed = 0;
285+
286+
foreach ($runIds as $runId) {
287+
try {
288+
$result = $this->pruneRun($namespace, $runId);
289+
290+
if ($result['pruned']) {
291+
$pruned++;
292+
$results[] = [
293+
'run_id' => $runId,
294+
'outcome' => 'pruned',
295+
'history_events_deleted' => $result['history_events_deleted'],
296+
'tasks_deleted' => $result['tasks_deleted'],
297+
'deleted' => $result['deleted'],
298+
];
299+
} else {
300+
$skipped++;
301+
$results[] = [
302+
'run_id' => $runId,
303+
'outcome' => 'skipped',
304+
'reason' => $result['reason'],
305+
];
306+
}
307+
} catch (\Throwable $e) {
308+
$failed++;
309+
$results[] = [
310+
'run_id' => $runId,
311+
'outcome' => 'error',
312+
'reason' => $e->getMessage(),
313+
];
314+
}
315+
}
316+
317+
$hasFailures = $failed > 0;
318+
319+
return ControlPlaneProtocol::json([
320+
'processed' => count($runIds),
321+
'pruned' => $pruned,
322+
'skipped' => $skipped,
323+
'failed' => $failed,
324+
'results' => $results,
325+
], $hasFailures ? 207 : 200);
326+
}
327+
328+
/**
329+
* @return array{pruned: bool, reason: string|null, history_events_deleted: int, tasks_deleted: int, deleted: array<string, int>}
330+
*/
331+
private function pruneRun(string $namespace, string $runId): array
332+
{
333+
$summary = WorkflowRunSummary::query()
334+
->where('id', $runId)
335+
->where('namespace', $namespace)
336+
->first();
337+
338+
if (! $summary) {
339+
return $this->skippedRetentionResult('run_not_found');
340+
}
341+
342+
$status = is_string($summary->status) ? RunStatus::tryFrom($summary->status) : null;
343+
344+
if ($status === null || ! $status->isTerminal()) {
345+
return $this->skippedRetentionResult('run_not_terminal');
346+
}
347+
348+
if ($summary->archived_at !== null) {
349+
return $this->skippedRetentionResult('run_archived');
350+
}
351+
352+
// Audit log before deletion
353+
Log::info('retention_prune_run', [
354+
'namespace' => $namespace,
355+
'run_id' => $runId,
356+
'workflow_instance_id' => $summary->workflow_instance_id,
357+
'workflow_type' => $summary->workflow_type,
358+
'status' => $summary->status,
359+
'closed_at' => $summary->closed_at?->toIso8601String(),
360+
]);
361+
362+
$report = WorkflowRunRetentionCleanup::pruneRun($runId);
363+
364+
return [
365+
'pruned' => true,
366+
'reason' => null,
367+
'history_events_deleted' => $report['history_events_deleted'] ?? 0,
368+
'tasks_deleted' => $report['tasks_deleted'] ?? 0,
369+
'deleted' => $report,
370+
];
371+
}
372+
373+
/**
374+
* @return array{pruned: false, reason: string, history_events_deleted: 0, tasks_deleted: 0, deleted: array<string, int>}
375+
*/
376+
private function skippedRetentionResult(string $reason): array
377+
{
378+
return [
379+
'pruned' => false,
380+
'reason' => $reason,
381+
'history_events_deleted' => 0,
382+
'tasks_deleted' => 0,
383+
'deleted' => [],
384+
];
245385
}
246386
}

app/Http/Controllers/Api/WorkerController.php

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
namespace App\Http\Controllers\Api;
44

55
use App\Models\WorkerRegistration;
6-
use App\Support\HistoryRetentionEnforcer;
76
use App\Support\NamespaceWorkflowScope;
87
use App\Support\QueryTaskQueueUnavailableException;
98
use App\Support\WorkerProtocol;
@@ -251,12 +250,9 @@ public function heartbeat(Request $request): JsonResponse
251250
buildId: is_string($worker->build_id) ? $worker->build_id : null,
252251
);
253252

254-
$retention = HistoryRetentionEnforcer::runInlinePass($namespace);
255-
256253
return WorkerProtocol::json([
257254
'worker_id' => $worker->worker_id,
258255
'acknowledged' => true,
259-
'retention' => $retention,
260256
]);
261257
}
262258

app/Support/HistoryRetentionEnforcer.php

Lines changed: 0 additions & 193 deletions
This file was deleted.

0 commit comments

Comments
 (0)