66use App \Models \WorkerRegistration ;
77use App \Support \ControlPlaneProtocol ;
88use App \Support \TaskQueueAdmission ;
9+ use App \Support \TaskQueueBuildIdRolloutSnapshot ;
910use App \Support \WorkflowQueryTaskBroker ;
1011use Illuminate \Http \JsonResponse ;
1112use Illuminate \Http \Request ;
@@ -18,6 +19,7 @@ class TaskQueueController
1819 public function __construct (
1920 private readonly WorkflowQueryTaskBroker $ queryTasks ,
2021 private readonly TaskQueueAdmission $ admission ,
22+ private readonly TaskQueueBuildIdRolloutSnapshot $ buildIdRollouts ,
2123 ) {}
2224
2325 public function index (Request $ request ): JsonResponse
@@ -90,156 +92,12 @@ public function buildIds(Request $request, string $taskQueue): JsonResponse
9092 return $ response ;
9193 }
9294
93- $ namespace = (string ) $ request ->attributes ->get ('namespace ' );
94- $ staleAfter = $ this ->workerStaleAfterSeconds ();
95- $ now = now ();
96-
97- $ workers = WorkerRegistration::query ()
98- ->where ('namespace ' , $ namespace )
99- ->where ('task_queue ' , $ taskQueue )
100- ->orderByDesc ('last_heartbeat_at ' )
101- ->orderBy ('worker_id ' )
102- ->get ();
103-
104- $ groups = [];
105-
106- foreach ($ workers as $ worker ) {
107- $ buildId = is_string ($ worker ->build_id ) && trim ($ worker ->build_id ) !== ''
108- ? trim ($ worker ->build_id )
109- : null ;
110- $ key = $ buildId ?? '__unversioned__ ' ;
111-
112- $ heartbeat = $ worker ->last_heartbeat_at ;
113- $ isStale = $ heartbeat
114- && $ heartbeat ->lt ($ now ->copy ()->subSeconds ($ staleAfter ));
115- $ declaredStatus = is_string ($ worker ->status ) ? $ worker ->status : 'active ' ;
116- $ effectiveStatus = $ isStale ? 'stale ' : $ declaredStatus ;
117-
118- $ groups [$ key ] ??= [
119- 'build_id ' => $ buildId ,
120- 'active_worker_count ' => 0 ,
121- 'stale_worker_count ' => 0 ,
122- 'draining_worker_count ' => 0 ,
123- 'total_worker_count ' => 0 ,
124- 'runtimes ' => [],
125- 'sdk_versions ' => [],
126- 'last_heartbeat_at ' => null ,
127- 'first_seen_at ' => null ,
128- ];
129-
130- $ groups [$ key ]['total_worker_count ' ]++;
131- if ($ effectiveStatus === 'stale ' ) {
132- $ groups [$ key ]['stale_worker_count ' ]++;
133- } elseif ($ effectiveStatus === 'draining ' ) {
134- $ groups [$ key ]['draining_worker_count ' ]++;
135- } else {
136- $ groups [$ key ]['active_worker_count ' ]++;
137- }
138-
139- if (is_string ($ worker ->runtime ) && trim ($ worker ->runtime ) !== '' ) {
140- $ groups [$ key ]['runtimes ' ][trim ($ worker ->runtime )] = true ;
141- }
142- if (is_string ($ worker ->sdk_version ) && trim ($ worker ->sdk_version ) !== '' ) {
143- $ groups [$ key ]['sdk_versions ' ][trim ($ worker ->sdk_version )] = true ;
144- }
145-
146- if ($ heartbeat !== null ) {
147- $ existing = $ groups [$ key ]['last_heartbeat_at ' ];
148- if ($ existing === null || $ heartbeat ->gt ($ existing )) {
149- $ groups [$ key ]['last_heartbeat_at ' ] = $ heartbeat ;
150- }
151- }
152-
153- $ createdAt = $ worker ->created_at ;
154- if ($ createdAt !== null ) {
155- $ existing = $ groups [$ key ]['first_seen_at ' ];
156- if ($ existing === null || $ createdAt ->lt ($ existing )) {
157- $ groups [$ key ]['first_seen_at ' ] = $ createdAt ;
158- }
159- }
160- }
161-
162- $ rolloutMap = $ this ->rolloutsForTaskQueue ($ namespace , $ taskQueue );
163-
164- $ buildIds = [];
165- foreach ($ groups as $ group ) {
166- $ runtimes = array_keys ($ group ['runtimes ' ]);
167- sort ($ runtimes );
168- $ sdkVersions = array_keys ($ group ['sdk_versions ' ]);
169- sort ($ sdkVersions );
170-
171- $ rolloutKey = WorkerBuildIdRollout::buildIdKey ($ group ['build_id ' ]);
172- $ rollout = $ rolloutMap [$ rolloutKey ] ?? null ;
173- $ drainIntent = $ rollout ?->drain_intent ?? WorkerBuildIdRollout::DRAIN_INTENT_ACTIVE ;
174-
175- $ buildIds [] = [
176- 'build_id ' => $ group ['build_id ' ],
177- 'rollout_status ' => $ this ->buildIdRolloutStatus (
178- $ group ['active_worker_count ' ],
179- $ group ['draining_worker_count ' ],
180- $ group ['stale_worker_count ' ],
181- $ drainIntent ,
182- ),
183- 'drain_intent ' => $ drainIntent ,
184- 'drained_at ' => $ rollout ?->drained_at?->toJSON(),
185- 'active_worker_count ' => $ group ['active_worker_count ' ],
186- 'draining_worker_count ' => $ group ['draining_worker_count ' ],
187- 'stale_worker_count ' => $ group ['stale_worker_count ' ],
188- 'total_worker_count ' => $ group ['total_worker_count ' ],
189- 'runtimes ' => $ runtimes ,
190- 'sdk_versions ' => $ sdkVersions ,
191- 'last_heartbeat_at ' => $ group ['last_heartbeat_at ' ]?->toJSON(),
192- 'first_seen_at ' => $ group ['first_seen_at ' ]?->toJSON(),
193- ];
194- }
195-
196- // Surface rollout intent even for cohorts whose worker rows have
197- // been removed: operators still need to see "this build_id is
198- // marked draining" until they explicitly resume it.
199- foreach ($ rolloutMap as $ key => $ rollout ) {
200- if (isset ($ groups [$ key === '' ? '__unversioned__ ' : $ key ])) {
201- continue ;
202- }
203-
204- $ buildIds [] = [
205- 'build_id ' => $ rollout ->publicBuildId (),
206- 'rollout_status ' => $ this ->buildIdRolloutStatus (
207- 0 ,
208- 0 ,
209- 0 ,
210- $ rollout ->drain_intent ,
211- ),
212- 'drain_intent ' => $ rollout ->drain_intent ,
213- 'drained_at ' => $ rollout ->drained_at ?->toJSON(),
214- 'active_worker_count ' => 0 ,
215- 'draining_worker_count ' => 0 ,
216- 'stale_worker_count ' => 0 ,
217- 'total_worker_count ' => 0 ,
218- 'runtimes ' => [],
219- 'sdk_versions ' => [],
220- 'last_heartbeat_at ' => null ,
221- 'first_seen_at ' => null ,
222- ];
223- }
224-
225- usort ($ buildIds , function (array $ a , array $ b ): int {
226- $ rankA = $ this ->buildIdRolloutRank ($ a );
227- $ rankB = $ this ->buildIdRolloutRank ($ b );
228- if ($ rankA !== $ rankB ) {
229- return $ rankA <=> $ rankB ;
230- }
231- $ heartA = $ a ['last_heartbeat_at ' ] ?? '' ;
232- $ heartB = $ b ['last_heartbeat_at ' ] ?? '' ;
233-
234- return strcmp ($ heartB , $ heartA );
235- });
236-
237- return ControlPlaneProtocol::json ([
238- 'namespace ' => $ namespace ,
239- 'task_queue ' => $ taskQueue ,
240- 'stale_after_seconds ' => $ staleAfter ,
241- 'build_ids ' => $ buildIds ,
242- ]);
95+ return ControlPlaneProtocol::json (
96+ $ this ->buildIdRollouts ->forTaskQueue (
97+ (string ) $ request ->attributes ->get ('namespace ' ),
98+ $ taskQueue ,
99+ ),
100+ );
243101 }
244102
245103 /**
@@ -332,77 +190,6 @@ private function setBuildIdDrainIntent(Request $request, string $taskQueue, stri
332190 ]);
333191 }
334192
335- /**
336- * @return array<string, WorkerBuildIdRollout>
337- */
338- private function rolloutsForTaskQueue (string $ namespace , string $ taskQueue ): array
339- {
340- $ rollouts = WorkerBuildIdRollout::query ()
341- ->where ('namespace ' , $ namespace )
342- ->where ('task_queue ' , $ taskQueue )
343- ->get ();
344-
345- $ map = [];
346- foreach ($ rollouts as $ rollout ) {
347- $ map [(string ) $ rollout ->build_id ] = $ rollout ;
348- }
349-
350- return $ map ;
351- }
352-
353- private function buildIdRolloutStatus (
354- int $ active ,
355- int $ draining ,
356- int $ stale ,
357- string $ drainIntent = WorkerBuildIdRollout::DRAIN_INTENT_ACTIVE ,
358- ): string {
359- $ intentDraining = $ drainIntent === WorkerBuildIdRollout::DRAIN_INTENT_DRAINING ;
360-
361- if ($ active > 0 ) {
362- return $ intentDraining || $ draining > 0 ? 'active_with_draining ' : 'active ' ;
363- }
364-
365- if ($ draining > 0 ) {
366- return 'draining ' ;
367- }
368-
369- if ($ intentDraining ) {
370- // Operator intent is to drain, but no live workers remain to
371- // acknowledge it. Keep the cohort visible as draining so the
372- // rollout state is clear even after stale workers are purged.
373- return 'draining ' ;
374- }
375-
376- return $ stale > 0 ? 'stale_only ' : 'no_workers ' ;
377- }
378-
379- /**
380- * Sort key: rollout-active builds first, then draining, then stale.
381- * Within each rollout-status bucket the unversioned cohort sorts last
382- * so the named builds an operator is rolling out are visible above
383- * the legacy default — but a stale named build still sorts below an
384- * active unversioned cohort.
385- *
386- * @param array<string, mixed> $entry
387- */
388- private function buildIdRolloutRank (array $ entry ): int
389- {
390- $ statusRank = match ($ entry ['rollout_status ' ] ?? '' ) {
391- 'active ' => 0 ,
392- 'active_with_draining ' => 1 ,
393- 'draining ' => 2 ,
394- 'stale_only ' => 3 ,
395- default => 4 ,
396- };
397-
398- $ rank = $ statusRank * 2 ;
399- if (($ entry ['build_id ' ] ?? null ) === null ) {
400- $ rank += 1 ;
401- }
402-
403- return $ rank ;
404- }
405-
406193 /**
407194 * @param array<string, mixed> $payload
408195 * @return array<string, mixed>
0 commit comments