Skip to content

Commit dbea371

Browse files
Run retention cleanup from worker heartbeats
1 parent b6b1d7f commit dbea371

9 files changed

Lines changed: 291 additions & 165 deletions

File tree

.github/workflows/server-perf.yml

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ on:
77
- ".github/workflows/server-perf.yml"
88
- "Dockerfile"
99
- "docker-compose.yml"
10+
- "app/Support/HistoryRetentionEnforcer.php"
1011
- "app/Support/ServerPollingCache.php"
1112
- "app/Support/WorkflowTaskPoller.php"
1213
- "app/Support/WorkflowTaskPollRequestStore.php"
@@ -22,6 +23,7 @@ on:
2223
- ".github/workflows/server-perf.yml"
2324
- "Dockerfile"
2425
- "docker-compose.yml"
26+
- "app/Support/HistoryRetentionEnforcer.php"
2527
- "app/Support/ServerPollingCache.php"
2628
- "app/Support/WorkflowTaskPoller.php"
2729
- "app/Support/WorkflowTaskPollRequestStore.php"
@@ -107,8 +109,8 @@ jobs:
107109
DW_PERF_MAX_SERVER_MEMORY_MB: "768"
108110
DW_PERF_MAX_POLLING_KEYS: "512"
109111
DW_PERF_MAX_FINAL_POLLING_KEYS: "0"
110-
DW_PERF_MAX_SERVER_CACHE_KEYS_BY_POLICY: '{"workflow_task_poll_requests":512,"long_poll_signals":512,"workflow_query_tasks":64,"task_queue_admission_locks":128,"task_queue_dispatch_counters":128,"workflow_task_expired_lease_recovery":128,"readiness_probe":16}'
111-
DW_PERF_MAX_FINAL_SERVER_CACHE_KEYS_BY_POLICY: '{"workflow_task_poll_requests":0,"long_poll_signals":0,"workflow_query_tasks":0,"task_queue_admission_locks":0,"task_queue_dispatch_counters":0,"workflow_task_expired_lease_recovery":0,"readiness_probe":0}'
112+
DW_PERF_MAX_SERVER_CACHE_KEYS_BY_POLICY: '{"workflow_task_poll_requests":512,"long_poll_signals":512,"workflow_query_tasks":64,"task_queue_admission_locks":128,"task_queue_dispatch_counters":128,"workflow_task_expired_lease_recovery":128,"history_retention_inline":64,"readiness_probe":16}'
113+
DW_PERF_MAX_FINAL_SERVER_CACHE_KEYS_BY_POLICY: '{"workflow_task_poll_requests":0,"long_poll_signals":0,"workflow_query_tasks":0,"task_queue_admission_locks":0,"task_queue_dispatch_counters":0,"workflow_task_expired_lease_recovery":0,"history_retention_inline":0,"readiness_probe":0}'
112114
run: scripts/perf/run-server-soak.sh
113115

114116
- name: Upload perf artifacts
@@ -138,8 +140,8 @@ jobs:
138140
DW_PERF_MAX_SERVER_MEMORY_MB: "1024"
139141
DW_PERF_MAX_POLLING_KEYS: "2048"
140142
DW_PERF_MAX_FINAL_POLLING_KEYS: "0"
141-
DW_PERF_MAX_SERVER_CACHE_KEYS_BY_POLICY: '{"workflow_task_poll_requests":2048,"long_poll_signals":2048,"workflow_query_tasks":128,"task_queue_admission_locks":256,"task_queue_dispatch_counters":256,"workflow_task_expired_lease_recovery":256,"readiness_probe":32}'
142-
DW_PERF_MAX_FINAL_SERVER_CACHE_KEYS_BY_POLICY: '{"workflow_task_poll_requests":0,"long_poll_signals":0,"workflow_query_tasks":0,"task_queue_admission_locks":0,"task_queue_dispatch_counters":0,"workflow_task_expired_lease_recovery":0,"readiness_probe":0}'
143+
DW_PERF_MAX_SERVER_CACHE_KEYS_BY_POLICY: '{"workflow_task_poll_requests":2048,"long_poll_signals":2048,"workflow_query_tasks":128,"task_queue_admission_locks":256,"task_queue_dispatch_counters":256,"workflow_task_expired_lease_recovery":256,"history_retention_inline":128,"readiness_probe":32}'
144+
DW_PERF_MAX_FINAL_SERVER_CACHE_KEYS_BY_POLICY: '{"workflow_task_poll_requests":0,"long_poll_signals":0,"workflow_query_tasks":0,"task_queue_admission_locks":0,"task_queue_dispatch_counters":0,"workflow_task_expired_lease_recovery":0,"history_retention_inline":0,"readiness_probe":0}'
143145
DW_PERF_MAX_SERVER_MEMORY_SLOPE_MB_HOUR: "128"
144146
DW_PERF_REMOTE_WRITE_ENABLED: ${{ github.event_name != 'workflow_dispatch' || inputs.remote_write }}
145147
DW_PERF_REMOTE_WRITE_URL: ${{ vars.DW_PERF_REMOTE_WRITE_URL }}

app/Http/Controllers/Api/SystemController.php

Lines changed: 6 additions & 146 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,15 @@
22

33
namespace App\Http\Controllers\Api;
44

5-
use App\Models\WorkflowNamespace;
65
use App\Support\ControlPlaneProtocol;
7-
use App\Support\NamespaceWorkflowScope;
6+
use App\Support\HistoryRetentionEnforcer;
87
use App\Support\ProjectionDriftMetrics;
98
use App\Support\WorkflowTaskFailureMetrics;
109
use Illuminate\Http\JsonResponse;
1110
use Illuminate\Http\Request;
12-
use Illuminate\Support\Facades\Log;
13-
use Workflow\V2\Enums\RunStatus;
14-
use Workflow\V2\Models\WorkflowRunSummary;
1511
use Workflow\V2\Support\ActivityTimeoutEnforcer;
1612
use Workflow\V2\Support\TaskRepairCandidates;
1713
use Workflow\V2\Support\TaskRepairPolicy;
18-
use Workflow\V2\Support\WorkflowRunRetentionCleanup;
1914
use Workflow\V2\TaskWatchdog;
2015

2116
class SystemController
@@ -208,19 +203,9 @@ public function retentionStatus(Request $request): JsonResponse
208203
]);
209204
$limit = min(100, (int) ($validated['limit'] ?? 100));
210205

211-
$ns = WorkflowNamespace::query()->where('name', $namespace)->first();
212-
$retentionDays = $ns?->retention_days ?? (int) config('server.history.retention_days', 30);
206+
$retentionDays = HistoryRetentionEnforcer::retentionDays($namespace);
213207
$cutoff = now()->subDays($retentionDays);
214-
215-
$expiredRunIds = NamespaceWorkflowScope::runSummaryQuery($namespace)
216-
->whereIn('workflow_run_summaries.status_bucket', ['completed', 'failed'])
217-
->whereNotNull('workflow_run_summaries.closed_at')
218-
->whereNull('workflow_run_summaries.archived_at')
219-
->where('workflow_run_summaries.closed_at', '<', $cutoff)
220-
->orderBy('workflow_run_summaries.closed_at')
221-
->limit($limit)
222-
->pluck('workflow_run_summaries.id')
223-
->all();
208+
$expiredRunIds = HistoryRetentionEnforcer::expiredRunIds($namespace, $limit);
224209

225210
return ControlPlaneProtocol::json([
226211
'namespace' => $namespace,
@@ -252,135 +237,10 @@ public function retentionEnforcePass(Request $request): JsonResponse
252237
$validated['run_ids'] ?? [],
253238
));
254239

255-
if ($runIds === []) {
256-
$ns = WorkflowNamespace::query()->where('name', $namespace)->first();
257-
$retentionDays = $ns?->retention_days ?? (int) config('server.history.retention_days', 30);
258-
$cutoff = now()->subDays($retentionDays);
259-
260-
$runIds = NamespaceWorkflowScope::runSummaryQuery($namespace)
261-
->whereIn('workflow_run_summaries.status_bucket', ['completed', 'failed'])
262-
->whereNotNull('workflow_run_summaries.closed_at')
263-
->whereNull('workflow_run_summaries.archived_at')
264-
->where('workflow_run_summaries.closed_at', '<', $cutoff)
265-
->orderBy('workflow_run_summaries.closed_at')
266-
->limit($limit)
267-
->pluck('workflow_run_summaries.id')
268-
->all();
269-
}
270-
271-
if ($runIds === []) {
272-
return ControlPlaneProtocol::json([
273-
'processed' => 0,
274-
'pruned' => 0,
275-
'skipped' => 0,
276-
'failed' => 0,
277-
'results' => [],
278-
]);
279-
}
280-
281-
$results = [];
282-
$pruned = 0;
283-
$skipped = 0;
284-
$failed = 0;
285-
286-
foreach ($runIds as $runId) {
287-
try {
288-
$result = $this->pruneRun($namespace, $runId);
289-
290-
if ($result['pruned']) {
291-
$pruned++;
292-
$results[] = [
293-
'run_id' => $runId,
294-
'outcome' => 'pruned',
295-
'history_events_deleted' => $result['history_events_deleted'],
296-
'tasks_deleted' => $result['tasks_deleted'],
297-
'deleted' => $result['deleted'],
298-
];
299-
} else {
300-
$skipped++;
301-
$results[] = [
302-
'run_id' => $runId,
303-
'outcome' => 'skipped',
304-
'reason' => $result['reason'],
305-
];
306-
}
307-
} catch (\Throwable $e) {
308-
$failed++;
309-
$results[] = [
310-
'run_id' => $runId,
311-
'outcome' => 'error',
312-
'reason' => $e->getMessage(),
313-
];
314-
}
315-
}
316-
317-
$hasFailures = $failed > 0;
318-
319-
return ControlPlaneProtocol::json([
320-
'processed' => count($runIds),
321-
'pruned' => $pruned,
322-
'skipped' => $skipped,
323-
'failed' => $failed,
324-
'results' => $results,
325-
], $hasFailures ? 207 : 200);
326-
}
327-
328-
/**
329-
* @return array{pruned: bool, reason: string|null, history_events_deleted: int, tasks_deleted: int, deleted: array<string, int>}
330-
*/
331-
private function pruneRun(string $namespace, string $runId): array
332-
{
333-
$summary = WorkflowRunSummary::query()
334-
->where('id', $runId)
335-
->where('namespace', $namespace)
336-
->first();
337-
338-
if (! $summary) {
339-
return $this->skippedRetentionResult('run_not_found');
340-
}
240+
$report = HistoryRetentionEnforcer::runPass($namespace, $limit, $runIds);
341241

342-
$status = is_string($summary->status) ? RunStatus::tryFrom($summary->status) : null;
242+
$hasFailures = $report['failed'] > 0;
343243

344-
if ($status === null || ! $status->isTerminal()) {
345-
return $this->skippedRetentionResult('run_not_terminal');
346-
}
347-
348-
if ($summary->archived_at !== null) {
349-
return $this->skippedRetentionResult('run_archived');
350-
}
351-
352-
// Audit log before deletion
353-
Log::info('retention_prune_run', [
354-
'namespace' => $namespace,
355-
'run_id' => $runId,
356-
'workflow_instance_id' => $summary->workflow_instance_id,
357-
'workflow_type' => $summary->workflow_type,
358-
'status' => $summary->status,
359-
'closed_at' => $summary->closed_at?->toIso8601String(),
360-
]);
361-
362-
$report = WorkflowRunRetentionCleanup::pruneRun($runId);
363-
364-
return [
365-
'pruned' => true,
366-
'reason' => null,
367-
'history_events_deleted' => $report['history_events_deleted'] ?? 0,
368-
'tasks_deleted' => $report['tasks_deleted'] ?? 0,
369-
'deleted' => $report,
370-
];
371-
}
372-
373-
/**
374-
* @return array{pruned: false, reason: string, history_events_deleted: 0, tasks_deleted: 0, deleted: array<string, int>}
375-
*/
376-
private function skippedRetentionResult(string $reason): array
377-
{
378-
return [
379-
'pruned' => false,
380-
'reason' => $reason,
381-
'history_events_deleted' => 0,
382-
'tasks_deleted' => 0,
383-
'deleted' => [],
384-
];
244+
return ControlPlaneProtocol::json($report, $hasFailures ? 207 : 200);
385245
}
386246
}

app/Http/Controllers/Api/WorkerController.php

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
namespace App\Http\Controllers\Api;
44

55
use App\Models\WorkerRegistration;
6+
use App\Support\HistoryRetentionEnforcer;
67
use App\Support\NamespaceWorkflowScope;
78
use App\Support\QueryTaskQueueUnavailableException;
89
use App\Support\WorkerProtocol;
@@ -250,9 +251,12 @@ public function heartbeat(Request $request): JsonResponse
250251
buildId: is_string($worker->build_id) ? $worker->build_id : null,
251252
);
252253

254+
$retention = HistoryRetentionEnforcer::runInlinePass($namespace);
255+
253256
return WorkerProtocol::json([
254257
'worker_id' => $worker->worker_id,
255258
'acknowledged' => true,
259+
'retention' => $retention,
256260
]);
257261
}
258262

0 commit comments

Comments
 (0)