Skip to content

Commit b49b040

Browse files
Add drain and resume mutation for build_id rollout state
Operators could see per-build-id rollout state via GET /api/task-queues/{taskQueue}/build-ids, but there was no way to mark a build_id cohort as draining before removing the older workers. Without that mutation, rollouts had to rely on stopping each worker process by hand, which is easy to mis-sequence when rolling back a bad build or cutting over from unversioned to versioned workers. Add two POST endpoints on the same route prefix: - POST /api/task-queues/{taskQueue}/build-ids/drain - POST /api/task-queues/{taskQueue}/build-ids/resume Both take a {"build_id": "..."} body; passing null targets the unversioned cohort (the pre-rollout default). The call is idempotent, so repeated drains do not shift the recorded drained_at timestamp. Drain intent persists in a new workflow_worker_build_id_rollouts table keyed on (namespace, task_queue, build_id), with the empty string used as the sentinel for the unversioned cohort so the unique index behaves consistently across SQLite/MySQL/PostgreSQL/SQL Server (null is treated as distinct in unique indexes on several backends). Worker register and heartbeat paths now stamp the registration with status='draining' whenever the cohort carries drain_intent='draining', so an in-flight worker cannot clobber the operator intent by heart- beating itself back to active. Resuming a cohort clears drain_intent, wipes drained_at, and flips any already-draining worker rows back to active so the read endpoint reports honest state immediately. The read endpoint gains drain_intent and drained_at fields per cohort and continues to surface drained cohorts even after their worker rows are removed, so rollout state stays visible when workers are torn down. rollout_status reflects operator intent: an active cohort that has been drained reports active_with_draining, and a drained cohort with no live workers reports draining instead of stale_only. The route is /operator/ scoped like the rest of the task-queues prefix, and both endpoints are covered by the control-plane version and operational-success contract tests.
1 parent 1fbedf8 commit b49b040

8 files changed

Lines changed: 686 additions & 4 deletions

app/Http/Controllers/Api/TaskQueueController.php

Lines changed: 162 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace App\Http\Controllers\Api;
44

5+
use App\Models\WorkerBuildIdRollout;
56
use App\Models\WorkerRegistration;
67
use App\Support\ControlPlaneProtocol;
78
use App\Support\TaskQueueAdmission;
@@ -149,20 +150,29 @@ public function buildIds(Request $request, string $taskQueue): JsonResponse
149150
}
150151
}
151152

153+
$rolloutMap = $this->rolloutsForTaskQueue($namespace, $taskQueue);
154+
152155
$buildIds = [];
153156
foreach ($groups as $group) {
154157
$runtimes = array_keys($group['runtimes']);
155158
sort($runtimes);
156159
$sdkVersions = array_keys($group['sdk_versions']);
157160
sort($sdkVersions);
158161

162+
$rolloutKey = WorkerBuildIdRollout::buildIdKey($group['build_id']);
163+
$rollout = $rolloutMap[$rolloutKey] ?? null;
164+
$drainIntent = $rollout?->drain_intent ?? WorkerBuildIdRollout::DRAIN_INTENT_ACTIVE;
165+
159166
$buildIds[] = [
160167
'build_id' => $group['build_id'],
161168
'rollout_status' => $this->buildIdRolloutStatus(
162169
$group['active_worker_count'],
163170
$group['draining_worker_count'],
164171
$group['stale_worker_count'],
172+
$drainIntent,
165173
),
174+
'drain_intent' => $drainIntent,
175+
'drained_at' => $rollout?->drained_at?->toJSON(),
166176
'active_worker_count' => $group['active_worker_count'],
167177
'draining_worker_count' => $group['draining_worker_count'],
168178
'stale_worker_count' => $group['stale_worker_count'],
@@ -174,6 +184,35 @@ public function buildIds(Request $request, string $taskQueue): JsonResponse
174184
];
175185
}
176186

187+
// Surface rollout intent even for cohorts whose worker rows have
188+
// been removed: operators still need to see "this build_id is
189+
// marked draining" until they explicitly resume it.
190+
foreach ($rolloutMap as $key => $rollout) {
191+
if (isset($groups[$key === '' ? '__unversioned__' : $key])) {
192+
continue;
193+
}
194+
195+
$buildIds[] = [
196+
'build_id' => $rollout->publicBuildId(),
197+
'rollout_status' => $this->buildIdRolloutStatus(
198+
0,
199+
0,
200+
0,
201+
$rollout->drain_intent,
202+
),
203+
'drain_intent' => $rollout->drain_intent,
204+
'drained_at' => $rollout->drained_at?->toJSON(),
205+
'active_worker_count' => 0,
206+
'draining_worker_count' => 0,
207+
'stale_worker_count' => 0,
208+
'total_worker_count' => 0,
209+
'runtimes' => [],
210+
'sdk_versions' => [],
211+
'last_heartbeat_at' => null,
212+
'first_seen_at' => null,
213+
];
214+
}
215+
177216
usort($buildIds, function (array $a, array $b): int {
178217
$rankA = $this->buildIdRolloutRank($a);
179218
$rankB = $this->buildIdRolloutRank($b);
@@ -194,16 +233,137 @@ public function buildIds(Request $request, string $taskQueue): JsonResponse
194233
]);
195234
}
196235

197-
private function buildIdRolloutStatus(int $active, int $draining, int $stale): string
236+
/**
237+
* Mark a build_id cohort as draining so operators can cut traffic off
238+
* a build before deleting its workers. Passing null for build_id drains
239+
* the unversioned cohort (the pre-rollout default). The call is
240+
* idempotent: repeated drains return the existing rollout record.
241+
*/
242+
public function drainBuildId(Request $request, string $taskQueue): JsonResponse
243+
{
244+
return $this->setBuildIdDrainIntent(
245+
$request,
246+
$taskQueue,
247+
WorkerBuildIdRollout::DRAIN_INTENT_DRAINING,
248+
);
249+
}
250+
251+
/**
252+
* Revert an earlier drain so the build_id cohort can accept work again
253+
* (rollback path). Passing null for build_id resumes the unversioned
254+
* cohort. The call is idempotent.
255+
*/
256+
public function resumeBuildId(Request $request, string $taskQueue): JsonResponse
257+
{
258+
return $this->setBuildIdDrainIntent(
259+
$request,
260+
$taskQueue,
261+
WorkerBuildIdRollout::DRAIN_INTENT_ACTIVE,
262+
);
263+
}
264+
265+
private function setBuildIdDrainIntent(Request $request, string $taskQueue, string $intent): JsonResponse
266+
{
267+
if ($response = ControlPlaneProtocol::rejectUnsupported($request)) {
268+
return $response;
269+
}
270+
271+
$validated = $request->validate([
272+
'build_id' => ['present', 'nullable', 'string', 'max:255'],
273+
]);
274+
275+
$namespace = (string) $request->attributes->get('namespace');
276+
$publicBuildId = is_string($validated['build_id']) && trim($validated['build_id']) !== ''
277+
? trim($validated['build_id'])
278+
: null;
279+
$key = WorkerBuildIdRollout::buildIdKey($publicBuildId);
280+
$now = now();
281+
282+
$rollout = WorkerBuildIdRollout::query()->firstOrNew([
283+
'namespace' => $namespace,
284+
'task_queue' => $taskQueue,
285+
'build_id' => $key,
286+
]);
287+
288+
$draining = $intent === WorkerBuildIdRollout::DRAIN_INTENT_DRAINING;
289+
$wasDraining = $rollout->drain_intent === WorkerBuildIdRollout::DRAIN_INTENT_DRAINING;
290+
291+
$rollout->drain_intent = $intent;
292+
$rollout->drained_at = $draining
293+
? ($wasDraining ? $rollout->drained_at : $now)
294+
: null;
295+
$rollout->save();
296+
297+
if (! $draining) {
298+
// Clear the draining status we stamped on worker rows so the
299+
// next heartbeat is not forced back to draining by the resume
300+
// path. Workers that are still running will have their status
301+
// restamped to active on their next heartbeat anyway, but
302+
// clearing immediately keeps the read endpoint honest.
303+
WorkerRegistration::query()
304+
->where('namespace', $namespace)
305+
->where('task_queue', $taskQueue)
306+
->when(
307+
$publicBuildId !== null,
308+
fn ($query) => $query->where('build_id', $publicBuildId),
309+
fn ($query) => $query->where(function ($q) {
310+
$q->whereNull('build_id')->orWhere('build_id', '');
311+
}),
312+
)
313+
->where('status', WorkerBuildIdRollout::DRAIN_INTENT_DRAINING)
314+
->update(['status' => 'active']);
315+
}
316+
317+
return ControlPlaneProtocol::json([
318+
'namespace' => $namespace,
319+
'task_queue' => $taskQueue,
320+
'build_id' => $publicBuildId,
321+
'drain_intent' => $rollout->drain_intent,
322+
'drained_at' => $rollout->drained_at?->toJSON(),
323+
]);
324+
}
325+
326+
/**
327+
* @return array<string, WorkerBuildIdRollout>
328+
*/
329+
private function rolloutsForTaskQueue(string $namespace, string $taskQueue): array
198330
{
331+
$rollouts = WorkerBuildIdRollout::query()
332+
->where('namespace', $namespace)
333+
->where('task_queue', $taskQueue)
334+
->get();
335+
336+
$map = [];
337+
foreach ($rollouts as $rollout) {
338+
$map[(string) $rollout->build_id] = $rollout;
339+
}
340+
341+
return $map;
342+
}
343+
344+
private function buildIdRolloutStatus(
345+
int $active,
346+
int $draining,
347+
int $stale,
348+
string $drainIntent = WorkerBuildIdRollout::DRAIN_INTENT_ACTIVE,
349+
): string {
350+
$intentDraining = $drainIntent === WorkerBuildIdRollout::DRAIN_INTENT_DRAINING;
351+
199352
if ($active > 0) {
200-
return $draining > 0 ? 'active_with_draining' : 'active';
353+
return $intentDraining || $draining > 0 ? 'active_with_draining' : 'active';
201354
}
202355

203356
if ($draining > 0) {
204357
return 'draining';
205358
}
206359

360+
if ($intentDraining) {
361+
// Operator intent is to drain, but no live workers remain to
362+
// acknowledge it. Keep the cohort visible as draining so the
363+
// rollout state is clear even after stale workers are purged.
364+
return 'draining';
365+
}
366+
207367
return $stale > 0 ? 'stale_only' : 'no_workers';
208368
}
209369

app/Http/Controllers/Api/WorkerController.php

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
namespace App\Http\Controllers\Api;
44

5+
use App\Models\WorkerBuildIdRollout;
56
use App\Models\WorkerRegistration;
67
use App\Support\HistoryRetentionEnforcer;
78
use App\Support\NamespaceWorkflowScope;
@@ -98,6 +99,12 @@ public function register(Request $request): JsonResponse
9899
);
99100
}
100101

102+
$registrationStatus = $this->workerRegistrationStatus(
103+
$namespace,
104+
$validated['task_queue'],
105+
$validated['build_id'] ?? null,
106+
);
107+
101108
WorkerRegistration::updateOrCreate(
102109
[
103110
'worker_id' => $workerId,
@@ -114,7 +121,7 @@ public function register(Request $request): JsonResponse
114121
'max_concurrent_workflow_tasks' => $validated['max_concurrent_workflow_tasks'] ?? 100,
115122
'max_concurrent_activity_tasks' => $validated['max_concurrent_activity_tasks'] ?? 100,
116123
'last_heartbeat_at' => now(),
117-
'status' => 'active',
124+
'status' => $registrationStatus,
118125
]
119126
);
120127

@@ -240,9 +247,15 @@ public function heartbeat(Request $request): JsonResponse
240247
], 404);
241248
}
242249

250+
$heartbeatStatus = $this->workerRegistrationStatus(
251+
$worker->namespace,
252+
$worker->task_queue,
253+
is_string($worker->build_id) ? $worker->build_id : null,
254+
);
255+
243256
$worker->update([
244257
'last_heartbeat_at' => now(),
245-
'status' => 'active',
258+
'status' => $heartbeatStatus,
246259
]);
247260

248261
StandaloneWorkerVisibility::recordCompatibility(
@@ -1166,4 +1179,30 @@ private function workflowTaskStopReason(mixed $runStatus): string
11661179
default => 'run_closed',
11671180
};
11681181
}
1182+
1183+
/**
1184+
* Derive the worker status to stamp on register/heartbeat from operator
1185+
* rollout intent. If an operator has marked this build_id cohort as
1186+
* draining, incoming worker rows stay draining across heartbeats so the
1187+
* drain intent cannot be clobbered by ordinary polling traffic.
1188+
*/
1189+
private function workerRegistrationStatus(
1190+
string $namespace,
1191+
string $taskQueue,
1192+
?string $buildId,
1193+
): string {
1194+
$key = WorkerBuildIdRollout::buildIdKey($buildId);
1195+
1196+
$rollout = WorkerBuildIdRollout::query()
1197+
->where('namespace', $namespace)
1198+
->where('task_queue', $taskQueue)
1199+
->where('build_id', $key)
1200+
->first();
1201+
1202+
if ($rollout instanceof WorkerBuildIdRollout && $rollout->isDraining()) {
1203+
return WorkerBuildIdRollout::DRAIN_INTENT_DRAINING;
1204+
}
1205+
1206+
return 'active';
1207+
}
11691208
}
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
<?php
2+
3+
namespace App\Models;
4+
5+
use Illuminate\Database\Eloquent\Model;
6+
7+
class WorkerBuildIdRollout extends Model
8+
{
9+
public const DRAIN_INTENT_ACTIVE = 'active';
10+
11+
public const DRAIN_INTENT_DRAINING = 'draining';
12+
13+
public const UNVERSIONED_KEY = '';
14+
15+
protected $table = 'workflow_worker_build_id_rollouts';
16+
17+
protected $fillable = [
18+
'namespace',
19+
'task_queue',
20+
'build_id',
21+
'drain_intent',
22+
'drained_at',
23+
];
24+
25+
protected function casts(): array
26+
{
27+
return [
28+
'drained_at' => 'datetime',
29+
];
30+
}
31+
32+
public static function buildIdKey(?string $buildId): string
33+
{
34+
if (! is_string($buildId)) {
35+
return self::UNVERSIONED_KEY;
36+
}
37+
38+
$trimmed = trim($buildId);
39+
40+
return $trimmed === '' ? self::UNVERSIONED_KEY : $trimmed;
41+
}
42+
43+
public static function buildIdFromKey(string $key): ?string
44+
{
45+
return $key === self::UNVERSIONED_KEY ? null : $key;
46+
}
47+
48+
public function publicBuildId(): ?string
49+
{
50+
return self::buildIdFromKey((string) $this->build_id);
51+
}
52+
53+
public function isDraining(): bool
54+
{
55+
return $this->drain_intent === self::DRAIN_INTENT_DRAINING;
56+
}
57+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
<?php
2+
3+
use Illuminate\Database\Migrations\Migration;
4+
use Illuminate\Database\Schema\Blueprint;
5+
use Illuminate\Support\Facades\Schema;
6+
7+
return new class extends Migration
8+
{
9+
public function up(): void
10+
{
11+
Schema::create('workflow_worker_build_id_rollouts', function (Blueprint $table) {
12+
$table->id();
13+
$table->string('namespace', 128);
14+
$table->string('task_queue', 255);
15+
// Stores the worker build_id or an empty string for the
16+
// unversioned cohort (pre-rollout default). Non-null so the
17+
// unique index below behaves consistently across backends —
18+
// several databases treat null values as distinct in unique
19+
// indexes, which would allow duplicate unversioned rows.
20+
$table->string('build_id', 255)->default('');
21+
$table->string('drain_intent', 32)->default('active');
22+
$table->timestamp('drained_at')->nullable();
23+
$table->timestamps();
24+
25+
$table->unique(
26+
['namespace', 'task_queue', 'build_id'],
27+
'workflow_build_id_rollouts_scope_unique'
28+
);
29+
});
30+
}
31+
32+
public function down(): void
33+
{
34+
Schema::dropIfExists('workflow_worker_build_id_rollouts');
35+
}
36+
};

routes/api.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,8 @@
145145
Route::prefix('task-queues')->middleware([$operator, $cpv, $ns])->group(function () {
146146
Route::get('/', [TaskQueueController::class, 'index']);
147147
Route::get('/{taskQueue}/build-ids', [TaskQueueController::class, 'buildIds']);
148+
Route::post('/{taskQueue}/build-ids/drain', [TaskQueueController::class, 'drainBuildId']);
149+
Route::post('/{taskQueue}/build-ids/resume', [TaskQueueController::class, 'resumeBuildId']);
148150
Route::get('/{taskQueue}', [TaskQueueController::class, 'show']);
149151
});
150152

0 commit comments

Comments
 (0)