Skip to content

Commit d761707

Browse files
Block workflow starts on drained task queues without an active cohort
Block workflow starts on drained task queues
1 parent e457c4a commit d761707

5 files changed

Lines changed: 293 additions & 4 deletions

File tree

README.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -537,6 +537,13 @@ cannot delete yet.
537537
- `POST /api/workflows/{id}/cancel` — Request cancellation
538538
- `POST /api/workflows/{id}/terminate` — Terminate immediately
539539

540+
Workflow starts fail closed with `409` / `reason: "task_queue_draining"`
541+
when the requested task queue has been explicitly drained and no active worker
542+
cohort remains to claim new work. The response includes
543+
`routing_status`, worker counts, and `draining_build_ids` so operators can
544+
distinguish "wait for the active cohort to return" from "resume or replace the
545+
drained build cohort first."
546+
540547
Workflow debug responses are capped support snapshots, not full run exports:
541548
the server fetches at most 25 pending workflow tasks, 25 pending activities
542549
with only each activity's current/latest attempt, and 10 recent failures. The

app/Http/Controllers/Api/WorkflowController.php

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
use App\Support\ControlPlaneResultMapper;
88
use App\Support\NamespaceExternalPayloadStorage;
99
use App\Support\NamespaceWorkflowScope;
10+
use App\Support\TaskQueueRoutingGate;
1011
use App\Support\WorkflowCommandContextFactory;
1112
use App\Support\WorkflowQueryTaskBroker;
1213
use App\Support\WorkflowRunDiagnostics;
@@ -27,6 +28,7 @@ class WorkflowController
2728
public function __construct(
2829
private readonly WorkflowStartService $workflowStartService,
2930
private readonly WorkflowControlPlane $workflowControlPlane,
31+
private readonly TaskQueueRoutingGate $taskQueueRoutingGate,
3032
private readonly WorkflowCommandContextFactory $commandContexts,
3133
private readonly ControlPlaneResultMapper $resultMapper,
3234
private readonly WorkflowQueryTaskBroker $queryTasks,
@@ -170,6 +172,33 @@ public function start(Request $request): JsonResponse
170172
], 409);
171173
}
172174

175+
$taskQueue = isset($validated['task_queue']) && is_string($validated['task_queue'])
176+
? trim($validated['task_queue'])
177+
: null;
178+
179+
if ($taskQueue !== null && $taskQueue !== '') {
180+
$routingBlock = $this->taskQueueRoutingGate->workflowStartBlock((string) $namespace, $taskQueue);
181+
182+
if ($routingBlock !== null) {
183+
return ControlPlaneProtocol::jsonForRequest($request, array_filter([
184+
'workflow_id' => $workflowId,
185+
'workflow_type' => $validated['workflow_type'],
186+
'task_queue' => $taskQueue,
187+
'message' => sprintf(
188+
'Task queue [%s] is draining and cannot accept new workflow starts until an active worker cohort is available.',
189+
$taskQueue,
190+
),
191+
'reason' => 'task_queue_draining',
192+
'routing_status' => $routingBlock['routing_status'],
193+
'active_worker_count' => $routingBlock['active_worker_count'],
194+
'draining_worker_count' => $routingBlock['draining_worker_count'],
195+
'stale_worker_count' => $routingBlock['stale_worker_count'],
196+
'draining_build_ids' => $routingBlock['draining_build_ids'],
197+
'drain_intent' => 'draining',
198+
], static fn (mixed $value): bool => $value !== null), 409);
199+
}
200+
}
201+
173202
try {
174203
$start = $this->workflowStartService->start(
175204
$validated,
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
<?php
2+
3+
namespace App\Support;
4+
5+
use App\Models\WorkerBuildIdRollout;
6+
use App\Models\WorkerRegistration;
7+
use Workflow\V2\Support\StandaloneWorkerVisibility;
8+
9+
final class TaskQueueRoutingGate
10+
{
11+
/**
12+
* @return array{
13+
* routing_status: string,
14+
* active_worker_count: int,
15+
* draining_worker_count: int,
16+
* stale_worker_count: int,
17+
* draining_build_ids: list<string|null>
18+
* }|null
19+
*/
20+
public function workflowStartBlock(string $namespace, string $taskQueue): ?array
21+
{
22+
$cutoff = now()->subSeconds($this->workerStaleAfterSeconds());
23+
$activeWorkerCount = 0;
24+
$drainingWorkerCount = 0;
25+
$staleWorkerCount = 0;
26+
27+
$workers = WorkerRegistration::query()
28+
->where('namespace', $namespace)
29+
->where('task_queue', $taskQueue)
30+
->get(['status', 'last_heartbeat_at']);
31+
32+
foreach ($workers as $worker) {
33+
$status = is_string($worker->status) ? $worker->status : 'active';
34+
$heartbeat = $worker->last_heartbeat_at;
35+
36+
if ($heartbeat !== null && $heartbeat->lt($cutoff)) {
37+
$staleWorkerCount++;
38+
39+
continue;
40+
}
41+
42+
if ($status === WorkerBuildIdRollout::DRAIN_INTENT_DRAINING) {
43+
$drainingWorkerCount++;
44+
45+
continue;
46+
}
47+
48+
$activeWorkerCount++;
49+
}
50+
51+
$drainingBuildIds = WorkerBuildIdRollout::query()
52+
->where('namespace', $namespace)
53+
->where('task_queue', $taskQueue)
54+
->where('drain_intent', WorkerBuildIdRollout::DRAIN_INTENT_DRAINING)
55+
->orderBy('build_id')
56+
->get()
57+
->map(static fn (WorkerBuildIdRollout $rollout): ?string => $rollout->publicBuildId())
58+
->values()
59+
->all();
60+
61+
if ($activeWorkerCount > 0 || ($drainingWorkerCount === 0 && $drainingBuildIds === [])) {
62+
return null;
63+
}
64+
65+
return [
66+
'routing_status' => 'draining',
67+
'active_worker_count' => $activeWorkerCount,
68+
'draining_worker_count' => $drainingWorkerCount,
69+
'stale_worker_count' => $staleWorkerCount,
70+
'draining_build_ids' => $drainingBuildIds,
71+
];
72+
}
73+
74+
private function workerStaleAfterSeconds(): int
75+
{
76+
$configured = config('server.workers.stale_after_seconds');
77+
$pollingTimeout = config('server.polling.timeout');
78+
79+
return StandaloneWorkerVisibility::staleAfterSeconds(
80+
is_numeric($configured) ? (int) $configured : null,
81+
is_numeric($pollingTimeout) ? (int) $pollingTimeout : null,
82+
);
83+
}
84+
}

tests/Feature/ControlPlaneErrorContractTest.php

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,15 @@
22

33
namespace Tests\Feature;
44

5+
use App\Models\WorkerBuildIdRollout;
6+
use App\Models\WorkerRegistration;
57
use App\Support\ControlPlaneProtocol;
68
use App\Support\WorkerProtocol;
79
use Illuminate\Foundation\Testing\RefreshDatabase;
810
use Illuminate\Testing\TestResponse;
911
use PHPUnit\Framework\Attributes\DataProvider;
1012
use Tests\Feature\Concerns\ServerTestHelpers;
13+
use Tests\Fixtures\AwaitApprovalWorkflow;
1114
use Tests\TestCase;
1215

1316
class ControlPlaneErrorContractTest extends TestCase
@@ -246,6 +249,57 @@ public function test_namespace_duplicate_errors_are_machine_readable_and_version
246249
->assertJsonPath('namespace', 'default');
247250
}
248251

252+
public function test_workflow_start_draining_queue_errors_are_machine_readable_and_versioned(): void
253+
{
254+
$this->configureWorkflowTypes([
255+
'tests.await-approval-workflow' => AwaitApprovalWorkflow::class,
256+
]);
257+
258+
WorkerRegistration::query()->create([
259+
'worker_id' => 'draining-worker',
260+
'namespace' => 'default',
261+
'task_queue' => 'drain-queue',
262+
'runtime' => 'php',
263+
'sdk_version' => '1.0.0',
264+
'build_id' => 'build-draining',
265+
'supported_workflow_types' => ['tests.await-approval-workflow'],
266+
'workflow_definition_fingerprints' => [],
267+
'supported_activity_types' => [],
268+
'max_concurrent_workflow_tasks' => 100,
269+
'max_concurrent_activity_tasks' => 100,
270+
'last_heartbeat_at' => now(),
271+
'status' => 'draining',
272+
]);
273+
274+
WorkerBuildIdRollout::query()->create([
275+
'namespace' => 'default',
276+
'task_queue' => 'drain-queue',
277+
'build_id' => 'build-draining',
278+
'drain_intent' => 'draining',
279+
'drained_at' => now(),
280+
]);
281+
282+
$response = $this->postJson('/api/workflows', [
283+
'workflow_id' => 'wf-drain-error',
284+
'workflow_type' => 'tests.await-approval-workflow',
285+
'task_queue' => 'drain-queue',
286+
], $this->controlPlaneHeadersWithWorkerProtocol());
287+
288+
$response->assertStatus(409)
289+
->assertHeader(ControlPlaneProtocol::HEADER, ControlPlaneProtocol::VERSION)
290+
->assertHeaderMissing(WorkerProtocol::HEADER)
291+
->assertJsonMissingPath('protocol_version')
292+
->assertJsonMissingPath('server_capabilities')
293+
->assertJsonPath('reason', 'task_queue_draining')
294+
->assertJsonPath('task_queue', 'drain-queue')
295+
->assertJsonPath('routing_status', 'draining')
296+
->assertJsonPath('draining_build_ids.0', 'build-draining')
297+
->assertJsonPath(
298+
'message',
299+
'Task queue [drain-queue] is draining and cannot accept new workflow starts until an active worker cohort is available.',
300+
);
301+
}
302+
249303
/**
250304
* @param array<string, mixed> $body
251305
* @param array<string, string> $headers

tests/Feature/WorkflowControlPlaneTest.php

Lines changed: 119 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,20 +2,22 @@
22

33
namespace Tests\Feature;
44

5+
use App\Models\WorkerBuildIdRollout;
6+
use App\Models\WorkerRegistration;
57
use App\Models\WorkflowNamespace;
68
use Illuminate\Foundation\Testing\RefreshDatabase;
7-
use Illuminate\Support\Str;
89
use Illuminate\Support\Facades\Queue;
10+
use Illuminate\Support\Str;
911
use Tests\Fixtures\AwaitApprovalWorkflow;
10-
use Tests\Fixtures\InternalParentWorkflow;
11-
use Tests\Fixtures\InternalChildWorkflow;
1212
use Tests\Fixtures\InteractiveCommandWorkflow;
13+
use Tests\Fixtures\InternalChildWorkflow;
14+
use Tests\Fixtures\InternalParentWorkflow;
1315
use Tests\TestCase;
1416
use Workflow\V2\Contracts\WorkflowControlPlane;
1517
use Workflow\V2\Jobs\RunWorkflowTask;
1618
use Workflow\V2\Models\WorkflowCommand;
17-
use Workflow\V2\Models\WorkflowInstance;
1819
use Workflow\V2\Models\WorkflowHistoryEvent;
20+
use Workflow\V2\Models\WorkflowInstance;
1921
use Workflow\V2\Models\WorkflowLink;
2022
use Workflow\V2\Models\WorkflowRunLineageEntry;
2123
use Workflow\V2\Models\WorkflowTask;
@@ -278,6 +280,119 @@ public function test_start_rejects_cross_namespace_workflow_id_without_leaking_t
278280
$this->assertStringContainsString('another namespace', $message);
279281
}
280282

283+
public function test_start_blocks_drained_task_queue_without_an_active_worker_cohort(): void
284+
{
285+
Queue::fake();
286+
287+
$this->configureWorkflowTypes();
288+
$this->createNamespace('default', 'Default namespace');
289+
290+
WorkerRegistration::query()->create([
291+
'worker_id' => 'draining-worker',
292+
'namespace' => 'default',
293+
'task_queue' => 'drain-queue',
294+
'runtime' => 'php',
295+
'sdk_version' => '1.0.0',
296+
'build_id' => 'build-draining',
297+
'supported_workflow_types' => ['tests.await-approval-workflow'],
298+
'workflow_definition_fingerprints' => [],
299+
'supported_activity_types' => [],
300+
'max_concurrent_workflow_tasks' => 100,
301+
'max_concurrent_activity_tasks' => 100,
302+
'last_heartbeat_at' => now(),
303+
'status' => 'draining',
304+
]);
305+
306+
WorkerBuildIdRollout::query()->create([
307+
'namespace' => 'default',
308+
'task_queue' => 'drain-queue',
309+
'build_id' => 'build-draining',
310+
'drain_intent' => 'draining',
311+
'drained_at' => now(),
312+
]);
313+
314+
$response = $this->withHeaders($this->apiHeaders())
315+
->postJson('/api/workflows', [
316+
'workflow_id' => 'wf-drained-start',
317+
'workflow_type' => 'tests.await-approval-workflow',
318+
'task_queue' => 'drain-queue',
319+
]);
320+
321+
$response->assertStatus(409)
322+
->assertJsonPath('workflow_id', 'wf-drained-start')
323+
->assertJsonPath('workflow_type', 'tests.await-approval-workflow')
324+
->assertJsonPath('task_queue', 'drain-queue')
325+
->assertJsonPath('reason', 'task_queue_draining')
326+
->assertJsonPath('routing_status', 'draining')
327+
->assertJsonPath('active_worker_count', 0)
328+
->assertJsonPath('draining_worker_count', 1)
329+
->assertJsonPath('stale_worker_count', 0)
330+
->assertJsonPath('draining_build_ids.0', 'build-draining')
331+
->assertJsonPath('drain_intent', 'draining');
332+
333+
$this->assertFalse(WorkflowInstance::query()->whereKey('wf-drained-start')->exists());
334+
}
335+
336+
public function test_start_allows_queue_with_active_and_draining_worker_cohorts(): void
337+
{
338+
Queue::fake();
339+
340+
$this->configureWorkflowTypes();
341+
$this->createNamespace('default', 'Default namespace');
342+
343+
WorkerRegistration::query()->create([
344+
'worker_id' => 'active-worker',
345+
'namespace' => 'default',
346+
'task_queue' => 'mixed-queue',
347+
'runtime' => 'php',
348+
'sdk_version' => '1.0.0',
349+
'build_id' => 'build-active',
350+
'supported_workflow_types' => ['tests.await-approval-workflow'],
351+
'workflow_definition_fingerprints' => [],
352+
'supported_activity_types' => [],
353+
'max_concurrent_workflow_tasks' => 100,
354+
'max_concurrent_activity_tasks' => 100,
355+
'last_heartbeat_at' => now(),
356+
'status' => 'active',
357+
]);
358+
359+
WorkerRegistration::query()->create([
360+
'worker_id' => 'draining-worker',
361+
'namespace' => 'default',
362+
'task_queue' => 'mixed-queue',
363+
'runtime' => 'php',
364+
'sdk_version' => '1.0.0',
365+
'build_id' => 'build-draining',
366+
'supported_workflow_types' => ['tests.await-approval-workflow'],
367+
'workflow_definition_fingerprints' => [],
368+
'supported_activity_types' => [],
369+
'max_concurrent_workflow_tasks' => 100,
370+
'max_concurrent_activity_tasks' => 100,
371+
'last_heartbeat_at' => now(),
372+
'status' => 'draining',
373+
]);
374+
375+
WorkerBuildIdRollout::query()->create([
376+
'namespace' => 'default',
377+
'task_queue' => 'mixed-queue',
378+
'build_id' => 'build-draining',
379+
'drain_intent' => 'draining',
380+
'drained_at' => now(),
381+
]);
382+
383+
$response = $this->withHeaders($this->apiHeaders())
384+
->postJson('/api/workflows', [
385+
'workflow_id' => 'wf-mixed-drain-start',
386+
'workflow_type' => 'tests.await-approval-workflow',
387+
'task_queue' => 'mixed-queue',
388+
]);
389+
390+
$response->assertCreated()
391+
->assertJsonPath('workflow_id', 'wf-mixed-drain-start')
392+
->assertJsonPath('workflow_type', 'tests.await-approval-workflow')
393+
->assertJsonPath('outcome', 'started_new');
394+
}
395+
281396
public function test_signal_is_scoped_by_namespace(): void
282397
{
283398
Queue::fake();

0 commit comments

Comments
 (0)