@@ -65,6 +65,175 @@ public function show(Request $request, string $taskQueue): JsonResponse
6565 );
6666 }
6767
68+ /**
69+ * Aggregate worker registrations by build_id for one task queue.
70+ *
71+ * Operators use this to answer "which builds can still claim work, and
72+ * is it safe to drain or remove the older build now?" before deleting
73+ * stale worker rows or rolling forward to a new build_id. Workers with
74+ * no build_id are reported under a null build_id row that represents
75+ * the unversioned cohort (the pre-rollout default).
76+ */
77+ public function buildIds (Request $ request , string $ taskQueue ): JsonResponse
78+ {
79+ if ($ response = ControlPlaneProtocol::rejectUnsupported ($ request )) {
80+ return $ response ;
81+ }
82+
83+ $ namespace = (string ) $ request ->attributes ->get ('namespace ' );
84+ $ staleAfter = $ this ->workerStaleAfterSeconds ();
85+ $ now = now ();
86+
87+ $ workers = WorkerRegistration::query ()
88+ ->where ('namespace ' , $ namespace )
89+ ->where ('task_queue ' , $ taskQueue )
90+ ->orderByDesc ('last_heartbeat_at ' )
91+ ->orderBy ('worker_id ' )
92+ ->get ();
93+
94+ $ groups = [];
95+
96+ foreach ($ workers as $ worker ) {
97+ $ buildId = is_string ($ worker ->build_id ) && trim ($ worker ->build_id ) !== ''
98+ ? trim ($ worker ->build_id )
99+ : null ;
100+ $ key = $ buildId ?? '__unversioned__ ' ;
101+
102+ $ heartbeat = $ worker ->last_heartbeat_at ;
103+ $ isStale = $ heartbeat
104+ && $ heartbeat ->lt ($ now ->copy ()->subSeconds ($ staleAfter ));
105+ $ declaredStatus = is_string ($ worker ->status ) ? $ worker ->status : 'active ' ;
106+ $ effectiveStatus = $ isStale ? 'stale ' : $ declaredStatus ;
107+
108+ $ groups [$ key ] ??= [
109+ 'build_id ' => $ buildId ,
110+ 'active_worker_count ' => 0 ,
111+ 'stale_worker_count ' => 0 ,
112+ 'draining_worker_count ' => 0 ,
113+ 'total_worker_count ' => 0 ,
114+ 'runtimes ' => [],
115+ 'sdk_versions ' => [],
116+ 'last_heartbeat_at ' => null ,
117+ 'first_seen_at ' => null ,
118+ ];
119+
120+ $ groups [$ key ]['total_worker_count ' ]++;
121+ if ($ effectiveStatus === 'stale ' ) {
122+ $ groups [$ key ]['stale_worker_count ' ]++;
123+ } elseif ($ effectiveStatus === 'draining ' ) {
124+ $ groups [$ key ]['draining_worker_count ' ]++;
125+ } else {
126+ $ groups [$ key ]['active_worker_count ' ]++;
127+ }
128+
129+ if (is_string ($ worker ->runtime ) && trim ($ worker ->runtime ) !== '' ) {
130+ $ groups [$ key ]['runtimes ' ][trim ($ worker ->runtime )] = true ;
131+ }
132+ if (is_string ($ worker ->sdk_version ) && trim ($ worker ->sdk_version ) !== '' ) {
133+ $ groups [$ key ]['sdk_versions ' ][trim ($ worker ->sdk_version )] = true ;
134+ }
135+
136+ if ($ heartbeat !== null ) {
137+ $ existing = $ groups [$ key ]['last_heartbeat_at ' ];
138+ if ($ existing === null || $ heartbeat ->gt ($ existing )) {
139+ $ groups [$ key ]['last_heartbeat_at ' ] = $ heartbeat ;
140+ }
141+ }
142+
143+ $ createdAt = $ worker ->created_at ;
144+ if ($ createdAt !== null ) {
145+ $ existing = $ groups [$ key ]['first_seen_at ' ];
146+ if ($ existing === null || $ createdAt ->lt ($ existing )) {
147+ $ groups [$ key ]['first_seen_at ' ] = $ createdAt ;
148+ }
149+ }
150+ }
151+
152+ $ buildIds = [];
153+ foreach ($ groups as $ group ) {
154+ $ runtimes = array_keys ($ group ['runtimes ' ]);
155+ sort ($ runtimes );
156+ $ sdkVersions = array_keys ($ group ['sdk_versions ' ]);
157+ sort ($ sdkVersions );
158+
159+ $ buildIds [] = [
160+ 'build_id ' => $ group ['build_id ' ],
161+ 'rollout_status ' => $ this ->buildIdRolloutStatus (
162+ $ group ['active_worker_count ' ],
163+ $ group ['draining_worker_count ' ],
164+ $ group ['stale_worker_count ' ],
165+ ),
166+ 'active_worker_count ' => $ group ['active_worker_count ' ],
167+ 'draining_worker_count ' => $ group ['draining_worker_count ' ],
168+ 'stale_worker_count ' => $ group ['stale_worker_count ' ],
169+ 'total_worker_count ' => $ group ['total_worker_count ' ],
170+ 'runtimes ' => $ runtimes ,
171+ 'sdk_versions ' => $ sdkVersions ,
172+ 'last_heartbeat_at ' => $ group ['last_heartbeat_at ' ]?->toJSON(),
173+ 'first_seen_at ' => $ group ['first_seen_at ' ]?->toJSON(),
174+ ];
175+ }
176+
177+ usort ($ buildIds , function (array $ a , array $ b ): int {
178+ $ rankA = $ this ->buildIdRolloutRank ($ a );
179+ $ rankB = $ this ->buildIdRolloutRank ($ b );
180+ if ($ rankA !== $ rankB ) {
181+ return $ rankA <=> $ rankB ;
182+ }
183+ $ heartA = $ a ['last_heartbeat_at ' ] ?? '' ;
184+ $ heartB = $ b ['last_heartbeat_at ' ] ?? '' ;
185+
186+ return strcmp ($ heartB , $ heartA );
187+ });
188+
189+ return ControlPlaneProtocol::json ([
190+ 'namespace ' => $ namespace ,
191+ 'task_queue ' => $ taskQueue ,
192+ 'stale_after_seconds ' => $ staleAfter ,
193+ 'build_ids ' => $ buildIds ,
194+ ]);
195+ }
196+
197+ private function buildIdRolloutStatus (int $ active , int $ draining , int $ stale ): string
198+ {
199+ if ($ active > 0 ) {
200+ return $ draining > 0 ? 'active_with_draining ' : 'active ' ;
201+ }
202+
203+ if ($ draining > 0 ) {
204+ return 'draining ' ;
205+ }
206+
207+ return $ stale > 0 ? 'stale_only ' : 'no_workers ' ;
208+ }
209+
210+ /**
211+ * Sort key: rollout-active builds first, then draining, then stale.
212+ * Within each rollout-status bucket the unversioned cohort sorts last
213+ * so the named builds an operator is rolling out are visible above
214+ * the legacy default — but a stale named build still sorts below an
215+ * active unversioned cohort.
216+ *
217+ * @param array<string, mixed> $entry
218+ */
219+ private function buildIdRolloutRank (array $ entry ): int
220+ {
221+ $ statusRank = match ($ entry ['rollout_status ' ] ?? '' ) {
222+ 'active ' => 0 ,
223+ 'active_with_draining ' => 1 ,
224+ 'draining ' => 2 ,
225+ 'stale_only ' => 3 ,
226+ default => 4 ,
227+ };
228+
229+ $ rank = $ statusRank * 2 ;
230+ if (($ entry ['build_id ' ] ?? null ) === null ) {
231+ $ rank += 1 ;
232+ }
233+
234+ return $ rank ;
235+ }
236+
68237 /**
69238 * @param array<string, mixed> $payload
70239 * @return array<string, mixed>
0 commit comments