Skip to content

Commit d4622f4

Browse files
Block workflow traffic until bootstrap blockers clear
Block workflow traffic until bootstrap blockers clear
1 parent 23418d6 commit d4622f4

6 files changed

Lines changed: 260 additions & 3 deletions

File tree

README.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -705,6 +705,12 @@ of pretending to be interchangeable HTTP peers. `GET /api/cluster/info`,
705705
`/api/health`, and `/api/ready` stay available for discovery and liveness even
706706
on scheduler-only, execution-only, or matching-only nodes.
707707

708+
Those runtime-serving workflow routes also fail closed on bootstrap blockers.
709+
If database connectivity or workflow-table migrations are not ready, hosted
710+
workflow and worker-protocol routes return `503` with
711+
`reason: "workflow_v2_blocked"` plus `blocked_by` and `remediation` instead of
712+
accepting traffic that depends on an incomplete rollout state.
713+
708714
The same `GET /api/cluster/info` response now includes a versioned
709715
`coordination_health` manifest for rollout-safety coordination risk. It
710716
summarizes the current server-wide workflow v2 health status, warning and error
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
<?php
2+
3+
namespace App\Http\Middleware;
4+
5+
use App\Support\ControlPlaneProtocol;
6+
use App\Support\ServerReadiness;
7+
use App\Support\WorkerProtocol;
8+
use Closure;
9+
use Illuminate\Http\JsonResponse;
10+
use Illuminate\Http\Request;
11+
use Symfony\Component\HttpFoundation\Response;
12+
13+
class RequireWorkflowBootstrapReady
14+
{
15+
public function __construct(
16+
private readonly ServerReadiness $readiness,
17+
) {}
18+
19+
public function handle(Request $request, Closure $next): Response
20+
{
21+
$status = $this->readiness->workflowStatus();
22+
$blockedBy = is_array($status['blocked_by'] ?? null)
23+
? array_values(array_filter($status['blocked_by'], static fn (mixed $value): bool => is_string($value) && $value !== ''))
24+
: [];
25+
26+
if (($status['status'] ?? null) !== 'blocked' || $blockedBy === []) {
27+
return $next($request);
28+
}
29+
30+
return self::error(
31+
$request,
32+
503,
33+
'workflow_v2_blocked',
34+
'This node is not ready to serve workflow v2 traffic until bootstrap blockers are cleared.',
35+
array_filter([
36+
'blocked_by' => $blockedBy,
37+
'remediation' => is_string($status['remediation'] ?? null) ? $status['remediation'] : null,
38+
], static fn (mixed $value): bool => $value !== null),
39+
);
40+
}
41+
42+
/**
43+
* @param array<string, mixed> $extra
44+
*/
45+
private static function error(Request $request, int $status, string $reason, string $message, array $extra = []): JsonResponse
46+
{
47+
if (WorkerProtocol::isWorkerPlaneRequest($request)) {
48+
return WorkerProtocol::json(array_filter([
49+
'reason' => $reason,
50+
'message' => $message,
51+
] + $extra, static fn (mixed $value): bool => $value !== null), $status);
52+
}
53+
54+
return ControlPlaneProtocol::jsonForRequest($request, array_filter([
55+
'reason' => $reason,
56+
'message' => $message,
57+
] + $extra, static fn (mixed $value): bool => $value !== null), $status);
58+
}
59+
}

app/Support/RemoteScheduleStarter.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ final class RemoteScheduleStarter implements ScheduleWorkflowStarter
1616
{
1717
public function __construct(
1818
private readonly WorkflowStartService $startService,
19+
private readonly ServerReadiness $readiness,
1920
) {}
2021

2122
public function start(
@@ -24,6 +25,20 @@ public function start(
2425
string $outcome,
2526
?string $effectiveOverlapPolicy = null,
2627
): ScheduleStartResult {
28+
$workflowStatus = $this->readiness->workflowStatus();
29+
$blockedBy = is_array($workflowStatus['blocked_by'] ?? null)
30+
? array_values(array_filter($workflowStatus['blocked_by'], static fn (mixed $value): bool => is_string($value) && $value !== ''))
31+
: [];
32+
33+
if (($workflowStatus['status'] ?? null) === 'blocked' && $blockedBy !== []) {
34+
throw new WorkflowExecutionUnavailableException(
35+
'schedule_start',
36+
$schedule->schedule_id,
37+
'workflow_v2_blocked',
38+
'Workflow v2 bootstrap blockers must clear before scheduled workflows can start.',
39+
);
40+
}
41+
2742
$action = WorkflowSchedule::normalizeActionTimeouts(
2843
is_array($schedule->action) ? $schedule->action : [],
2944
);

routes/api.php

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
use App\Http\Middleware\ControlPlaneVersionResolver;
1919
use App\Http\Middleware\NamespaceResolver;
2020
use App\Http\Middleware\RequireRole;
21+
use App\Http\Middleware\RequireWorkflowBootstrapReady;
2122
use App\Http\Middleware\RequireTopologyRoles;
2223
use App\Http\Middleware\WorkerProtocolVersionResolver;
2324
use Illuminate\Support\Facades\Route;
@@ -51,6 +52,11 @@
5152
// NamespaceResolver on hosted routes so wrong-node requests fail closed with a
5253
// machine-readable topology reason without leaking namespace existence.
5354
//
55+
// RequireWorkflowBootstrapReady sits in the same slot for runtime-serving
56+
// workflow routes. It only blocks on explicit database/migration bootstrap
57+
// blockers so routes fail closed during rollout/schema drift without locking
58+
// out recovery paths such as compatible worker registration.
59+
//
5460
// WorkerProtocolVersionResolver follows the same ordering for worker-plane
5561
// routes, keeping protocol skew and namespace errors in the worker envelope.
5662
Route::middleware([Authenticate::class])->group(function () {
@@ -62,6 +68,7 @@
6268
$cpv = ControlPlaneVersionResolver::class;
6369
$httpControl = RequireTopologyRoles::class.':api_ingress,control_plane';
6470
$httpWorker = RequireTopologyRoles::class.':api_ingress,control_plane';
71+
$workflowBootstrap = RequireWorkflowBootstrapReady::class;
6572
$wpv = WorkerProtocolVersionResolver::class;
6673

6774
// ── System ───────────────────────────────────────────────────────
@@ -82,7 +89,7 @@
8289
});
8390

8491
// ── Workflows ────────────────────────────────────────────────────
85-
Route::prefix('workflows')->middleware([$operator, $cpv, $httpControl, $ns])->group(function () {
92+
Route::prefix('workflows')->middleware([$operator, $cpv, $httpControl, $workflowBootstrap, $ns])->group(function () {
8693
Route::get('/', [WorkflowController::class, 'index']);
8794
Route::post('/', [WorkflowController::class, 'start']);
8895
Route::get('/{workflowId}', [WorkflowController::class, 'show']);
@@ -113,12 +120,12 @@
113120
});
114121

115122
// ── Bridge Adapters ──────────────────────────────────────────────
116-
Route::prefix('bridge-adapters')->middleware([$operator, $cpv, $httpControl, $ns])->group(function () {
123+
Route::prefix('bridge-adapters')->middleware([$operator, $cpv, $httpControl, $workflowBootstrap, $ns])->group(function () {
117124
Route::post('/webhook/{adapter}', [BridgeAdapterController::class, 'webhook']);
118125
});
119126

120127
// ── Worker Task Polling ──────────────────────────────────────────
121-
Route::prefix('worker')->middleware([$worker, $wpv, $httpWorker, $ns])->group(function () {
128+
Route::prefix('worker')->middleware([$worker, $wpv, $httpWorker, $workflowBootstrap, $ns])->group(function () {
122129
// Registration
123130
Route::post('/register', [WorkerController::class, 'register']);
124131
Route::post('/heartbeat', [WorkerController::class, 'heartbeat']);

tests/Feature/ScheduleEvaluateTest.php

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -223,6 +223,34 @@ public function test_it_records_rollout_safety_start_rejections_as_skips(): void
223223
$this->assertSame([], $schedule->recent_actions ?? []);
224224
}
225225

226+
public function test_it_skips_due_schedules_when_workflow_bootstrap_is_blocked(): void
227+
{
228+
DB::table('migrations')
229+
->where('migration', '2026_04_21_000300_add_workflow_definition_fingerprints_to_worker_registrations')
230+
->delete();
231+
232+
WorkflowSchedule::create([
233+
'schedule_id' => 'skip-bootstrap-blocked',
234+
'namespace' => 'default',
235+
'spec' => ['cron_expressions' => ['* * * * *']],
236+
'action' => ['workflow_type' => 'TestWorkflow'],
237+
'next_fire_at' => now()->subMinute(),
238+
]);
239+
240+
$this->artisan('schedule:evaluate')
241+
->assertExitCode(0)
242+
->expectsOutputToContain('skipped');
243+
244+
$schedule = WorkflowSchedule::where('schedule_id', 'skip-bootstrap-blocked')->firstOrFail();
245+
$this->assertSame('workflow_v2_blocked', $schedule->last_skip_reason);
246+
$this->assertSame(1, (int) $schedule->skipped_trigger_count);
247+
$this->assertSame(0, (int) $schedule->fires_count);
248+
$this->assertSame(0, (int) $schedule->failures_count);
249+
$this->assertNull($schedule->last_fired_at);
250+
$this->assertNull($schedule->latest_workflow_instance_id);
251+
$this->assertSame([], $schedule->recent_actions ?? []);
252+
}
253+
226254
// ── next_fire_at advancement ────────────────────────────────────
227255

228256
public function test_it_advances_next_fire_at_after_successful_fire(): void
Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Feature;
6+
7+
use App\Models\WorkflowNamespace;
8+
use App\Support\ControlPlaneProtocol;
9+
use App\Support\WorkerProtocol;
10+
use Illuminate\Foundation\Testing\RefreshDatabase;
11+
use Tests\TestCase;
12+
use Workflow\V2\Support\WorkerCompatibilityFleet;
13+
14+
class WorkflowBootstrapRouteGatingTest extends TestCase
15+
{
16+
use RefreshDatabase;
17+
18+
protected function setUp(): void
19+
{
20+
parent::setUp();
21+
22+
config([
23+
'server.auth.driver' => 'token',
24+
'server.auth.token' => null,
25+
'server.auth.role_tokens' => [
26+
'worker' => 'worker-token',
27+
'operator' => 'operator-token',
28+
'admin' => 'admin-token',
29+
],
30+
'server.auth.backward_compatible' => true,
31+
]);
32+
33+
WorkflowNamespace::query()->updateOrCreate(
34+
['name' => 'default'],
35+
[
36+
'description' => 'Default namespace',
37+
'retention_days' => 30,
38+
'status' => 'active',
39+
],
40+
);
41+
}
42+
43+
public function test_pending_rollout_safety_migrations_block_control_plane_routes(): void
44+
{
45+
$this->blockWorkflowBootstrap();
46+
47+
$this->withHeaders($this->controlHeaders('operator-token'))
48+
->getJson('/api/workflows')
49+
->assertStatus(503)
50+
->assertHeader(ControlPlaneProtocol::HEADER, ControlPlaneProtocol::VERSION)
51+
->assertJsonPath('reason', 'workflow_v2_blocked')
52+
->assertJsonPath('blocked_by.0', 'migrations')
53+
->assertJsonPath(
54+
'remediation',
55+
'Restore database connectivity and migrate the workflow tables before relying on workflow v2 rollout-safety health.',
56+
);
57+
}
58+
59+
public function test_pending_rollout_safety_migrations_block_worker_protocol_routes(): void
60+
{
61+
$this->blockWorkflowBootstrap();
62+
63+
$this->withHeaders($this->workerHeaders('worker-token'))
64+
->postJson('/api/worker/register', [
65+
'worker_id' => 'bootstrap-blocked-worker',
66+
'task_queue' => 'default',
67+
'runtime' => 'python',
68+
'build_id' => 'build-a',
69+
])
70+
->assertStatus(503)
71+
->assertHeader(WorkerProtocol::HEADER, WorkerProtocol::VERSION)
72+
->assertJsonPath('reason', 'workflow_v2_blocked')
73+
->assertJsonPath('blocked_by.0', 'migrations');
74+
}
75+
76+
public function test_bootstrap_gate_runs_before_namespace_resolution_on_hosted_routes(): void
77+
{
78+
$this->blockWorkflowBootstrap();
79+
80+
$this->withHeaders($this->controlHeaders('operator-token', 'ghost-namespace'))
81+
->getJson('/api/workflows')
82+
->assertStatus(503)
83+
->assertJsonPath('reason', 'workflow_v2_blocked')
84+
->assertJsonMissing(['reason' => 'namespace_not_found']);
85+
}
86+
87+
public function test_worker_registration_can_recover_fail_mode_compatibility_health(): void
88+
{
89+
config()->set('queue.default', 'redis');
90+
config()->set('queue.connections.redis.driver', 'redis');
91+
config()->set('workflows.v2.compatibility.current', 'build-a');
92+
config()->set('workflows.v2.compatibility.supported', ['build-a']);
93+
config()->set('workflows.v2.fleet.validation_mode', 'fail');
94+
95+
WorkerCompatibilityFleet::clear();
96+
97+
$this->withHeaders($this->workerHeaders('worker-token'))
98+
->postJson('/api/worker/register', [
99+
'worker_id' => 'build-a-worker',
100+
'task_queue' => 'default',
101+
'runtime' => 'python',
102+
'build_id' => 'build-a',
103+
])
104+
->assertCreated()
105+
->assertHeader(WorkerProtocol::HEADER, WorkerProtocol::VERSION)
106+
->assertJsonPath('worker_id', 'build-a-worker')
107+
->assertJsonPath('registered', true);
108+
109+
WorkerCompatibilityFleet::clear();
110+
}
111+
112+
private function blockWorkflowBootstrap(): void
113+
{
114+
\Illuminate\Support\Facades\DB::table('migrations')
115+
->where('migration', '2026_04_21_000300_add_workflow_definition_fingerprints_to_worker_registrations')
116+
->delete();
117+
}
118+
119+
/**
120+
* @return array<string, string>
121+
*/
122+
private function workerHeaders(string $token): array
123+
{
124+
return [
125+
'Authorization' => "Bearer {$token}",
126+
'X-Namespace' => 'default',
127+
WorkerProtocol::HEADER => WorkerProtocol::VERSION,
128+
];
129+
}
130+
131+
/**
132+
* @return array<string, string>
133+
*/
134+
private function controlHeaders(string $token, string $namespace = 'default'): array
135+
{
136+
return [
137+
'Authorization' => "Bearer {$token}",
138+
'X-Namespace' => $namespace,
139+
ControlPlaneProtocol::HEADER => ControlPlaneProtocol::VERSION,
140+
];
141+
}
142+
}

0 commit comments

Comments
 (0)