Skip to content

Commit e7aedb4

Browse files
Revert "Block default-queue starts on drained cohorts"
This reverts commit a1dc399.
1 parent a1dc399 commit e7aedb4

7 files changed

Lines changed: 56 additions & 298 deletions

app/Http/Controllers/Api/BridgeAdapterController.php

Lines changed: 26 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -113,40 +113,32 @@ private function startWorkflow(
113113
$workflowId = is_string($startTarget['workflow_id'] ?? null)
114114
? $startTarget['workflow_id']
115115
: $this->workflowIdFor($adapter, $idempotencyKey);
116-
117-
try {
118-
$taskQueue = $this->workflowStartService->resolveTaskQueue(
119-
$startTarget['workflow_type'],
120-
$startTarget['task_queue'] ?? null,
121-
);
122-
} catch (LogicException $exception) {
123-
return $this->rejected($request, $adapter, 'start_workflow', $idempotencyKey, 'unknown_target', [
124-
'message' => $exception->getMessage(),
125-
'target' => $this->redactedTarget($target),
126-
'correlation' => $correlation,
127-
]);
128-
}
129-
130-
$routingBlock = $this->taskQueueRoutingGate->workflowStartBlock((string) $namespace, $taskQueue);
131-
132-
if ($routingBlock !== null) {
133-
return $this->rejected($request, $adapter, 'start_workflow', $idempotencyKey, 'task_queue_draining', array_filter([
134-
'message' => sprintf(
135-
'Task queue [%s] is draining and cannot accept new workflow starts until an active worker cohort is available.',
136-
$taskQueue,
137-
),
138-
'target' => $this->redactedTarget($target + ['workflow_id' => $workflowId]),
139-
'correlation' => $correlation,
140-
'workflow_id' => $workflowId,
141-
'workflow_type' => $startTarget['workflow_type'],
142-
'task_queue' => $taskQueue,
143-
'routing_status' => $routingBlock['routing_status'],
144-
'active_worker_count' => $routingBlock['active_worker_count'],
145-
'draining_worker_count' => $routingBlock['draining_worker_count'],
146-
'stale_worker_count' => $routingBlock['stale_worker_count'],
147-
'draining_build_ids' => $routingBlock['draining_build_ids'],
148-
'drain_intent' => 'draining',
149-
], static fn (mixed $value): bool => $value !== null));
116+
$taskQueue = is_string($startTarget['task_queue'] ?? null)
117+
? trim($startTarget['task_queue'])
118+
: null;
119+
120+
if ($taskQueue !== null && $taskQueue !== '') {
121+
$routingBlock = $this->taskQueueRoutingGate->workflowStartBlock((string) $namespace, $taskQueue);
122+
123+
if ($routingBlock !== null) {
124+
return $this->rejected($request, $adapter, 'start_workflow', $idempotencyKey, 'task_queue_draining', array_filter([
125+
'message' => sprintf(
126+
'Task queue [%s] is draining and cannot accept new workflow starts until an active worker cohort is available.',
127+
$taskQueue,
128+
),
129+
'target' => $this->redactedTarget($target + ['workflow_id' => $workflowId]),
130+
'correlation' => $correlation,
131+
'workflow_id' => $workflowId,
132+
'workflow_type' => $startTarget['workflow_type'],
133+
'task_queue' => $taskQueue,
134+
'routing_status' => $routingBlock['routing_status'],
135+
'active_worker_count' => $routingBlock['active_worker_count'],
136+
'draining_worker_count' => $routingBlock['draining_worker_count'],
137+
'stale_worker_count' => $routingBlock['stale_worker_count'],
138+
'draining_build_ids' => $routingBlock['draining_build_ids'],
139+
'drain_intent' => 'draining',
140+
], static fn (mixed $value): bool => $value !== null));
141+
}
150142
}
151143

152144
try {
@@ -168,7 +160,6 @@ private function startWorkflow(
168160
'adapter' => $adapter,
169161
'action' => 'start_workflow',
170162
'idempotency_key' => $idempotencyKey,
171-
'task_queue' => $taskQueue,
172163
],
173164
),
174165
);

app/Http/Controllers/Api/WorkflowController.php

Lines changed: 25 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -172,36 +172,31 @@ public function start(Request $request): JsonResponse
172172
], 409);
173173
}
174174

175-
try {
176-
$taskQueue = $this->workflowStartService->resolveTaskQueue(
177-
$validated['workflow_type'],
178-
$validated['task_queue'] ?? null,
179-
);
180-
} catch (LogicException $exception) {
181-
throw ValidationException::withMessages([
182-
'workflow_type' => [$exception->getMessage()],
183-
]);
184-
}
185-
186-
$routingBlock = $this->taskQueueRoutingGate->workflowStartBlock((string) $namespace, $taskQueue);
175+
$taskQueue = isset($validated['task_queue']) && is_string($validated['task_queue'])
176+
? trim($validated['task_queue'])
177+
: null;
187178

188-
if ($routingBlock !== null) {
189-
return ControlPlaneProtocol::jsonForRequest($request, array_filter([
190-
'workflow_id' => $workflowId,
191-
'workflow_type' => $validated['workflow_type'],
192-
'task_queue' => $taskQueue,
193-
'message' => sprintf(
194-
'Task queue [%s] is draining and cannot accept new workflow starts until an active worker cohort is available.',
195-
$taskQueue,
196-
),
197-
'reason' => 'task_queue_draining',
198-
'routing_status' => $routingBlock['routing_status'],
199-
'active_worker_count' => $routingBlock['active_worker_count'],
200-
'draining_worker_count' => $routingBlock['draining_worker_count'],
201-
'stale_worker_count' => $routingBlock['stale_worker_count'],
202-
'draining_build_ids' => $routingBlock['draining_build_ids'],
203-
'drain_intent' => 'draining',
204-
], static fn (mixed $value): bool => $value !== null), 409);
179+
if ($taskQueue !== null && $taskQueue !== '') {
180+
$routingBlock = $this->taskQueueRoutingGate->workflowStartBlock((string) $namespace, $taskQueue);
181+
182+
if ($routingBlock !== null) {
183+
return ControlPlaneProtocol::jsonForRequest($request, array_filter([
184+
'workflow_id' => $workflowId,
185+
'workflow_type' => $validated['workflow_type'],
186+
'task_queue' => $taskQueue,
187+
'message' => sprintf(
188+
'Task queue [%s] is draining and cannot accept new workflow starts until an active worker cohort is available.',
189+
$taskQueue,
190+
),
191+
'reason' => 'task_queue_draining',
192+
'routing_status' => $routingBlock['routing_status'],
193+
'active_worker_count' => $routingBlock['active_worker_count'],
194+
'draining_worker_count' => $routingBlock['draining_worker_count'],
195+
'stale_worker_count' => $routingBlock['stale_worker_count'],
196+
'draining_build_ids' => $routingBlock['draining_build_ids'],
197+
'drain_intent' => 'draining',
198+
], static fn (mixed $value): bool => $value !== null), 409);
199+
}
205200
}
206201

207202
try {
@@ -214,7 +209,7 @@ public function start(Request $request): JsonResponse
214209
commandName: 'start',
215210
metadata: array_filter([
216211
'workflow_type' => $validated['workflow_type'],
217-
'task_queue' => $taskQueue,
212+
'task_queue' => $validated['task_queue'] ?? null,
218213
'duplicate_policy' => $validated['duplicate_policy'] ?? null,
219214
], static fn (mixed $value): bool => $value !== null),
220215
),

app/Support/ConfiguredWorkflowTypeValidator.php

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -7,30 +7,6 @@
77

88
final class ConfiguredWorkflowTypeValidator
99
{
10-
/**
11-
* @return class-string<Workflow>|null
12-
*/
13-
public function resolveWorkflowClass(string $workflowType): ?string
14-
{
15-
if (class_exists($workflowType) && is_subclass_of($workflowType, Workflow::class)) {
16-
return $workflowType;
17-
}
18-
19-
$configured = config('workflows.v2.types.workflows', []);
20-
21-
if (! is_array($configured) || ! array_key_exists($workflowType, $configured)) {
22-
return null;
23-
}
24-
25-
$workflowClass = $configured[$workflowType];
26-
27-
if (! is_string($workflowClass) || ! class_exists($workflowClass) || ! is_subclass_of($workflowClass, Workflow::class)) {
28-
return null;
29-
}
30-
31-
return $workflowClass;
32-
}
33-
3410
/**
3511
* @throws LogicException
3612
*/
@@ -49,7 +25,7 @@ public function validationMessage(string $workflowType): ?string
4925
return null;
5026
}
5127

52-
if ($this->resolveWorkflowClass($workflowType) !== null) {
28+
if (class_exists($workflowType) && is_subclass_of($workflowType, Workflow::class)) {
5329
return null;
5430
}
5531

app/Support/WorkflowStartService.php

Lines changed: 4 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
use Workflow\V2\CommandContext;
88
use Workflow\V2\Contracts\WorkflowControlPlane;
99
use Workflow\V2\Support\PayloadEnvelopeResolver;
10-
use Workflow\V2\Support\RoutingResolver;
11-
use Workflow\WorkflowMetadata;
1210

1311
class WorkflowStartService
1412
{
@@ -40,37 +38,9 @@ public function start(
4038
$workflowId = isset($validated['workflow_id']) && is_string($validated['workflow_id'])
4139
? $validated['workflow_id']
4240
: null;
43-
$taskQueue = $this->resolveTaskQueue($workflowType, $validated['task_queue'] ?? null);
44-
45-
return $this->startRemoteWorkflow(
46-
$workflowType,
47-
$workflowId,
48-
$taskQueue,
49-
$validated,
50-
$namespace,
51-
$commandContext,
52-
);
53-
}
54-
55-
public function resolveTaskQueue(string $workflowType, mixed $requestedTaskQueue = null): string
56-
{
5741
$this->workflowTypes->assertLoadable($workflowType);
5842

59-
if (is_string($requestedTaskQueue) && trim($requestedTaskQueue) !== '') {
60-
return trim($requestedTaskQueue);
61-
}
62-
63-
$workflowClass = $this->workflowTypes->resolveWorkflowClass($workflowType);
64-
65-
if ($workflowClass !== null) {
66-
$resolvedQueue = RoutingResolver::workflowQueue($workflowClass, new WorkflowMetadata([]));
67-
68-
if (is_string($resolvedQueue) && trim($resolvedQueue) !== '') {
69-
return trim($resolvedQueue);
70-
}
71-
}
72-
73-
return $this->defaultTaskQueue();
43+
return $this->startRemoteWorkflow($workflowType, $workflowId, $validated, $namespace, $commandContext);
7444
}
7545

7646
/**
@@ -102,7 +72,6 @@ public function resolveTaskQueue(string $workflowType, mixed $requestedTaskQueue
10272
private function startRemoteWorkflow(
10373
string $workflowType,
10474
?string $workflowId,
105-
string $taskQueue,
10675
array $validated,
10776
?string $namespace = null,
10877
?CommandContext $commandContext = null,
@@ -123,7 +92,9 @@ private function startRemoteWorkflow(
12392
$result = $this->controlPlane->start($workflowType, $workflowId, array_filter([
12493
'arguments' => $arguments,
12594
'payload_codec' => $payloadCodec,
126-
'queue' => $taskQueue,
95+
'queue' => isset($validated['task_queue']) && is_string($validated['task_queue'])
96+
? $validated['task_queue']
97+
: null,
12798
'business_key' => isset($validated['business_key']) && is_string($validated['business_key'])
12899
? $validated['business_key']
129100
: null,
@@ -185,19 +156,4 @@ private function intValue(array $validated, string $key): ?int
185156

186157
return is_int($value) ? $value : null;
187158
}
188-
189-
private function defaultTaskQueue(): string
190-
{
191-
$connection = config('queue.default');
192-
193-
if (! is_string($connection) || trim($connection) === '') {
194-
return 'default';
195-
}
196-
197-
$queue = config('queue.connections.'.trim($connection).'.queue', 'default');
198-
199-
return is_string($queue) && trim($queue) !== ''
200-
? trim($queue)
201-
: 'default';
202-
}
203159
}

tests/Feature/BridgeAdapterControllerTest.php

Lines changed: 0 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -304,71 +304,6 @@ public function test_webhook_bridge_blocks_drained_task_queue_starts_without_an_
304304
$this->assertFalse(WorkflowRun::query()->exists());
305305
}
306306

307-
public function test_webhook_bridge_rejects_start_when_the_implicit_default_queue_is_draining(): void
308-
{
309-
Queue::fake();
310-
311-
config()->set('queue.default', 'redis');
312-
config()->set('queue.connections.redis.driver', 'redis');
313-
config()->set('queue.connections.redis.queue', 'default');
314-
315-
WorkerRegistration::query()->create([
316-
'worker_id' => 'worker-default-draining',
317-
'namespace' => 'default',
318-
'task_queue' => 'default',
319-
'runtime' => 'php',
320-
'sdk_version' => '1.0.0',
321-
'build_id' => 'build-default-draining',
322-
'supported_workflow_types' => [],
323-
'workflow_definition_fingerprints' => [],
324-
'supported_activity_types' => [],
325-
'max_concurrent_workflow_tasks' => 100,
326-
'max_concurrent_activity_tasks' => 100,
327-
'last_heartbeat_at' => now(),
328-
'status' => 'draining',
329-
]);
330-
331-
WorkerBuildIdRollout::query()->create([
332-
'namespace' => 'default',
333-
'task_queue' => 'default',
334-
'build_id' => 'build-default-draining',
335-
'drain_intent' => 'draining',
336-
'drained_at' => now(),
337-
]);
338-
339-
$response = $this->withHeaders($this->apiHeaders())
340-
->postJson('/api/bridge-adapters/webhook/stripe', [
341-
'action' => 'start_workflow',
342-
'idempotency_key' => 'stripe-event-default-drain-1',
343-
'target' => [
344-
'workflow_id' => 'wf-bridge-drained-default-start',
345-
'workflow_type' => 'tests.interactive-command-workflow',
346-
],
347-
]);
348-
349-
$response->assertStatus(422)
350-
->assertJsonPath('adapter', 'stripe')
351-
->assertJsonPath('action', 'start_workflow')
352-
->assertJsonPath('accepted', false)
353-
->assertJsonPath('outcome', 'rejected')
354-
->assertJsonPath('reason', 'task_queue_draining')
355-
->assertJsonPath('workflow_id', 'wf-bridge-drained-default-start')
356-
->assertJsonPath('workflow_type', 'tests.interactive-command-workflow')
357-
->assertJsonPath('task_queue', 'default')
358-
->assertJsonPath('routing_status', 'draining')
359-
->assertJsonPath('active_worker_count', 0)
360-
->assertJsonPath('draining_worker_count', 1)
361-
->assertJsonPath('stale_worker_count', 0)
362-
->assertJsonPath('draining_build_ids.0', 'build-default-draining')
363-
->assertJsonPath('drain_intent', 'draining')
364-
->assertJsonPath(
365-
'message',
366-
'Task queue [default] is draining and cannot accept new workflow starts until an active worker cohort is available.',
367-
);
368-
369-
$this->assertFalse(WorkflowRun::query()->exists());
370-
}
371-
372307
public function test_webhook_bridge_surfaces_fail_closed_start_rejection_detail(): void
373308
{
374309
Queue::fake();

0 commit comments

Comments
 (0)