Skip to content

Commit 0a1420f

Browse files
Surface rejected workflow starts in control-plane responses
Surface rejected workflow starts in control-plane responses
1 parent 013d195 commit 0a1420f

8 files changed

Lines changed: 347 additions & 3 deletions

app/Http/Controllers/Api/BridgeAdapterController.php

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
use App\Support\ControlPlaneProtocol;
77
use App\Support\NamespaceExternalPayloadStorage;
88
use App\Support\NamespaceWorkflowScope;
9+
use App\Support\TaskQueueRoutingGate;
910
use App\Support\WorkflowCommandContextFactory;
1011
use App\Support\WorkflowStartService;
1112
use Illuminate\Http\JsonResponse;
@@ -22,6 +23,7 @@ class BridgeAdapterController
2223
public function __construct(
2324
private readonly WorkflowStartService $workflowStartService,
2425
private readonly WorkflowControlPlane $workflowControlPlane,
26+
private readonly TaskQueueRoutingGate $taskQueueRoutingGate,
2527
private readonly WorkflowCommandContextFactory $commandContexts,
2628
private readonly NamespaceExternalPayloadStorage $externalPayloadStorage,
2729
) {}
@@ -111,6 +113,33 @@ private function startWorkflow(
111113
$workflowId = is_string($startTarget['workflow_id'] ?? null)
112114
? $startTarget['workflow_id']
113115
: $this->workflowIdFor($adapter, $idempotencyKey);
116+
$taskQueue = is_string($startTarget['task_queue'] ?? null)
117+
? trim($startTarget['task_queue'])
118+
: null;
119+
120+
if ($taskQueue !== null && $taskQueue !== '') {
121+
$routingBlock = $this->taskQueueRoutingGate->workflowStartBlock((string) $namespace, $taskQueue);
122+
123+
if ($routingBlock !== null) {
124+
return $this->rejected($request, $adapter, 'start_workflow', $idempotencyKey, 'task_queue_draining', array_filter([
125+
'message' => sprintf(
126+
'Task queue [%s] is draining and cannot accept new workflow starts until an active worker cohort is available.',
127+
$taskQueue,
128+
),
129+
'target' => $this->redactedTarget($target + ['workflow_id' => $workflowId]),
130+
'correlation' => $correlation,
131+
'workflow_id' => $workflowId,
132+
'workflow_type' => $startTarget['workflow_type'],
133+
'task_queue' => $taskQueue,
134+
'routing_status' => $routingBlock['routing_status'],
135+
'active_worker_count' => $routingBlock['active_worker_count'],
136+
'draining_worker_count' => $routingBlock['draining_worker_count'],
137+
'stale_worker_count' => $routingBlock['stale_worker_count'],
138+
'draining_build_ids' => $routingBlock['draining_build_ids'],
139+
'drain_intent' => 'draining',
140+
], static fn (mixed $value): bool => $value !== null));
141+
}
142+
}
114143

115144
try {
116145
$start = $this->workflowStartService->start(
@@ -150,8 +179,29 @@ private function startWorkflow(
150179

151180
NamespaceWorkflowScope::bind($namespace, $start['workflow_id'], $start['workflow_type']);
152181

182+
$started = (bool) ($start['started'] ?? false);
153183
$duplicate = $start['outcome'] === 'returned_existing_active';
154184

185+
if (! $started && ! $duplicate) {
186+
return $this->rejected(
187+
$request,
188+
$adapter,
189+
'start_workflow',
190+
$idempotencyKey,
191+
$start['rejection_reason'] ?? $start['reason'] ?? 'unsupported_routing',
192+
array_filter([
193+
'message' => $start['message'] ?? null,
194+
'target' => $this->redactedTarget($target + ['workflow_id' => $start['workflow_id']]),
195+
'correlation' => $correlation,
196+
'workflow_id' => $start['workflow_id'],
197+
'run_id' => $start['run_id'],
198+
'workflow_type' => $start['workflow_type'],
199+
'rejection_reason' => $start['rejection_reason'] ?? null,
200+
'control_plane_outcome' => $start['outcome'],
201+
], static fn (mixed $value): bool => $value !== null),
202+
);
203+
}
204+
155205
return $this->outcome($request, [
156206
'adapter' => $adapter,
157207
'action' => 'start_workflow',

app/Http/Controllers/Api/WorkflowController.php

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -221,6 +221,7 @@ public function start(Request $request): JsonResponse
221221
}
222222

223223
$workflowId = $start['workflow_id'];
224+
$started = (bool) ($start['started'] ?? false);
224225

225226
NamespaceWorkflowScope::bind(
226227
$namespace,
@@ -239,6 +240,11 @@ public function start(Request $request): JsonResponse
239240
'business_key' => $run?->business_key,
240241
'payload_codec' => $run?->payload_codec,
241242
'outcome' => $start['outcome'],
243+
'command_status' => $started ? 'accepted' : 'rejected',
244+
'command_source' => 'control_plane',
245+
'reason' => $start['reason'],
246+
'rejection_reason' => $start['rejection_reason'],
247+
'message' => $start['message'],
242248
], $this->startStatusCode($start['outcome']));
243249
}
244250

app/Support/BridgeAdapterOutcomeContract.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,12 @@ public static function manifest(): array
7272
'idempotency_key',
7373
'target',
7474
'correlation',
75+
'workflow_id',
76+
'run_id',
77+
'workflow_type',
78+
'control_plane_outcome',
79+
'rejection_reason',
80+
'message',
7581
],
7682
'redaction' => [
7783
'never_echo' => ['authorization', 'signature', 'token', 'secret', 'raw_payload'],
@@ -109,6 +115,9 @@ public static function manifest(): array
109115
'auth_failed',
110116
'malformed_payload',
111117
'duplicate_start',
118+
'instance_already_started',
119+
'compatibility_blocked',
120+
'task_queue_draining',
112121
'unsupported_routing',
113122
'unsupported_action',
114123
'payload_too_large',
@@ -195,6 +204,12 @@ public static function manifest(): array
195204
'reason' => 'duplicate_start',
196205
'control_plane_outcome' => 'returned_existing_active',
197206
],
207+
'incompatible_fleet' => [
208+
'http_status' => 422,
209+
'outcome' => 'rejected',
210+
'reason' => 'compatibility_blocked',
211+
'control_plane_outcome' => 'rejected_compatibility_blocked',
212+
],
198213
'unconfigured_workflow_type' => [
199214
'http_status' => 422,
200215
'outcome' => 'rejected',

app/Support/WorkflowStartService.php

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,11 +19,14 @@ public function __construct(
1919
/**
2020
* @param array<string, mixed> $validated
2121
* @return array{
22+
* started: bool,
2223
* workflow_id: string,
2324
* run_id: string|null,
2425
* workflow_type: string,
2526
* outcome: string|null,
2627
* reason: string|null,
28+
* rejection_reason: string|null,
29+
* message: string|null,
2730
* }
2831
*/
2932
public function start(
@@ -43,21 +46,27 @@ public function start(
4346
/**
4447
* @param array<string, mixed> $validated
4548
* @return array{
49+
* started: bool,
4650
* workflow_id: string,
4751
* run_id: string|null,
4852
* workflow_type: string,
4953
* outcome: string|null,
5054
* reason: string|null,
55+
* rejection_reason: string|null,
56+
* message: string|null,
5157
* }
5258
*/
5359
/**
5460
* @param array<string, mixed> $validated
5561
* @return array{
62+
* started: bool,
5663
* workflow_id: string,
5764
* run_id: string|null,
5865
* workflow_type: string,
5966
* outcome: string|null,
6067
* reason: string|null,
68+
* rejection_reason: string|null,
69+
* message: string|null,
6170
* }
6271
*/
6372
private function startRemoteWorkflow(
@@ -99,12 +108,23 @@ private function startRemoteWorkflow(
99108
'command_context' => $commandContext,
100109
], static fn (mixed $value): bool => $value !== null));
101110

111+
$started = (bool) ($result['started'] ?? false);
112+
$reason = isset($result['reason']) && is_string($result['reason'])
113+
? $result['reason']
114+
: null;
115+
$message = isset($result['message']) && is_string($result['message'])
116+
? $result['message']
117+
: null;
118+
102119
return [
120+
'started' => $started,
103121
'workflow_id' => $result['workflow_instance_id'],
104122
'run_id' => $result['workflow_run_id'],
105123
'workflow_type' => $result['workflow_type'],
106124
'outcome' => $result['outcome'],
107-
'reason' => $result['reason'],
125+
'reason' => $reason,
126+
'rejection_reason' => $started ? null : $reason,
127+
'message' => $message,
108128
];
109129
}
110130

tests/Feature/BridgeAdapterControllerTest.php

Lines changed: 117 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,12 +2,16 @@
22

33
namespace Tests\Feature;
44

5+
use App\Models\WorkerBuildIdRollout;
6+
use App\Models\WorkerRegistration;
57
use Illuminate\Foundation\Testing\RefreshDatabase;
68
use Illuminate\Support\Facades\Queue;
79
use Tests\Feature\Concerns\ServerTestHelpers;
810
use Tests\Fixtures\InteractiveCommandWorkflow;
911
use Tests\TestCase;
1012
use Workflow\V2\Models\WorkflowCommand;
13+
use Workflow\V2\Models\WorkflowRun;
14+
use Workflow\V2\Support\WorkerCompatibilityFleet;
1115

1216
class BridgeAdapterControllerTest extends TestCase
1317
{
@@ -22,6 +26,14 @@ protected function setUp(): void
2226
$this->configureWorkflowTypes([
2327
'tests.interactive-command-workflow' => InteractiveCommandWorkflow::class,
2428
]);
29+
WorkerCompatibilityFleet::clear();
30+
}
31+
32+
protected function tearDown(): void
33+
{
34+
WorkerCompatibilityFleet::clear();
35+
36+
parent::tearDown();
2537
}
2638

2739
public function test_webhook_bridge_starts_workflow_and_dedupes_by_provider_event(): void
@@ -229,4 +241,109 @@ public function test_webhook_bridge_uses_named_rejections(): void
229241
->assertJsonPath('reason', 'unsupported_action')
230242
->assertJsonPath('action', 'not_supported');
231243
}
244+
245+
public function test_webhook_bridge_blocks_drained_task_queue_starts_without_an_active_worker_cohort(): void
246+
{
247+
Queue::fake();
248+
249+
WorkerRegistration::query()->create([
250+
'worker_id' => 'draining-worker',
251+
'namespace' => 'default',
252+
'task_queue' => 'drain-queue',
253+
'runtime' => 'php',
254+
'sdk_version' => '1.0.0',
255+
'build_id' => 'build-draining',
256+
'supported_workflow_types' => ['tests.interactive-command-workflow'],
257+
'workflow_definition_fingerprints' => [],
258+
'supported_activity_types' => [],
259+
'max_concurrent_workflow_tasks' => 100,
260+
'max_concurrent_activity_tasks' => 100,
261+
'last_heartbeat_at' => now(),
262+
'status' => 'draining',
263+
]);
264+
265+
WorkerBuildIdRollout::query()->create([
266+
'namespace' => 'default',
267+
'task_queue' => 'drain-queue',
268+
'build_id' => 'build-draining',
269+
'drain_intent' => 'draining',
270+
'drained_at' => now(),
271+
]);
272+
273+
$response = $this->withHeaders($this->apiHeaders())
274+
->postJson('/api/bridge-adapters/webhook/stripe', [
275+
'action' => 'start_workflow',
276+
'idempotency_key' => 'stripe-event-drain-1',
277+
'target' => [
278+
'workflow_id' => 'wf-bridge-drained-start',
279+
'workflow_type' => 'tests.interactive-command-workflow',
280+
'task_queue' => 'drain-queue',
281+
],
282+
]);
283+
284+
$response->assertStatus(422)
285+
->assertJsonPath('adapter', 'stripe')
286+
->assertJsonPath('action', 'start_workflow')
287+
->assertJsonPath('accepted', false)
288+
->assertJsonPath('outcome', 'rejected')
289+
->assertJsonPath('reason', 'task_queue_draining')
290+
->assertJsonPath('workflow_id', 'wf-bridge-drained-start')
291+
->assertJsonPath('workflow_type', 'tests.interactive-command-workflow')
292+
->assertJsonPath('task_queue', 'drain-queue')
293+
->assertJsonPath('routing_status', 'draining')
294+
->assertJsonPath('active_worker_count', 0)
295+
->assertJsonPath('draining_worker_count', 1)
296+
->assertJsonPath('stale_worker_count', 0)
297+
->assertJsonPath('draining_build_ids.0', 'build-draining')
298+
->assertJsonPath('drain_intent', 'draining')
299+
->assertJsonPath(
300+
'message',
301+
'Task queue [drain-queue] is draining and cannot accept new workflow starts until an active worker cohort is available.',
302+
);
303+
304+
$this->assertFalse(WorkflowRun::query()->exists());
305+
}
306+
307+
public function test_webhook_bridge_surfaces_fail_closed_start_rejection_detail(): void
308+
{
309+
Queue::fake();
310+
311+
config()->set('queue.default', 'redis');
312+
config()->set('queue.connections.redis.driver', 'redis');
313+
config()->set('workflows.v2.compatibility.current', 'build-a');
314+
config()->set('workflows.v2.compatibility.supported', ['build-a']);
315+
config()->set('workflows.v2.fleet.validation_mode', 'fail');
316+
317+
WorkerCompatibilityFleet::record(['build-b'], 'redis', 'default', 'worker-build-b');
318+
319+
$response = $this->withHeaders($this->apiHeaders())
320+
->postJson('/api/bridge-adapters/webhook/stripe', [
321+
'action' => 'start_workflow',
322+
'idempotency_key' => 'stripe-event-compat-1',
323+
'target' => [
324+
'workflow_id' => 'wf-bridge-compatibility-blocked',
325+
'workflow_type' => 'tests.interactive-command-workflow',
326+
],
327+
]);
328+
329+
$response->assertStatus(422)
330+
->assertJsonPath('adapter', 'stripe')
331+
->assertJsonPath('action', 'start_workflow')
332+
->assertJsonPath('accepted', false)
333+
->assertJsonPath('outcome', 'rejected')
334+
->assertJsonPath('reason', 'compatibility_blocked')
335+
->assertJsonPath('rejection_reason', 'compatibility_blocked')
336+
->assertJsonPath('workflow_id', 'wf-bridge-compatibility-blocked')
337+
->assertJsonPath('run_id', null)
338+
->assertJsonPath('workflow_type', 'tests.interactive-command-workflow')
339+
->assertJsonPath('control_plane_outcome', 'rejected_compatibility_blocked')
340+
->assertJsonPath(
341+
'message',
342+
'Workflow instance [wf-bridge-compatibility-blocked] cannot start. Start blocked under fail validation mode. '
343+
.'No active worker heartbeat advertises compatibility [build-a]. '
344+
.'Active workers there advertise [build-b].',
345+
);
346+
347+
$this->assertSame(0, WorkflowRun::query()->count());
348+
}
232349
}

0 commit comments

Comments
 (0)