Skip to content

Commit d84b290

Browse files
Expose worker build-id rollout on the Python SDK
Add `Client.list_task_queue_build_ids(task_queue)` and its sync mirror so operators using the Python SDK can verify which build cohorts can claim work on a queue before draining or removing an older build. The method calls the server's task-queue build-ids endpoint and returns a `TaskQueueBuildIdRollout` with per-cohort rollout status, active, draining, and stale worker counts, runtime and SDK-version mix, first-seen timestamp, and most-recent heartbeat. Unversioned workers are grouped under a cohort whose `build_id` is `None`. Consume the `task-queue-build-ids-parity.json` fixture byte-identically with the CLI so CI catches any divergence before it hits operators. Issue: zorporation/durable-workflow#491 Loop-ID: build-01
1 parent cbf0561 commit d84b290

6 files changed

Lines changed: 324 additions & 0 deletions

File tree

src/durable_workflow/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
StoragePayloadTestResult,
3333
StorageTestResult,
3434
TaskQueueAdmission,
35+
TaskQueueBuildIdCohort,
36+
TaskQueueBuildIdRollout,
3537
TaskQueueDescription,
3638
TaskQueueList,
3739
TaskQueueQueryAdmission,
@@ -177,6 +179,8 @@
177179
"StorageTestResult",
178180
"StartChildWorkflow",
179181
"TaskQueueAdmission",
182+
"TaskQueueBuildIdCohort",
183+
"TaskQueueBuildIdRollout",
180184
"TaskQueueDescription",
181185
"TaskQueueList",
182186
"TaskQueueQueryAdmission",

src/durable_workflow/client.py

Lines changed: 89 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,73 @@ class TaskQueueList:
467467
task_queues: list[TaskQueueDescription]
468468

469469

470+
@dataclass
471+
class TaskQueueBuildIdCohort:
472+
"""Per-build-id rollout state for one task queue.
473+
474+
``build_id`` is ``None`` for the cohort of workers that registered
475+
without a build identifier (the legacy unversioned default).
476+
"""
477+
478+
build_id: str | None
479+
rollout_status: str
480+
active_worker_count: int
481+
draining_worker_count: int
482+
stale_worker_count: int
483+
total_worker_count: int
484+
runtimes: list[str]
485+
sdk_versions: list[str]
486+
last_heartbeat_at: str | None = None
487+
first_seen_at: str | None = None
488+
raw: dict[str, Any] | None = None
489+
490+
@classmethod
491+
def from_dict(cls, data: dict[str, Any]) -> TaskQueueBuildIdCohort:
492+
runtimes = data.get("runtimes")
493+
sdk_versions = data.get("sdk_versions")
494+
return cls(
495+
build_id=data.get("build_id"),
496+
rollout_status=str(data.get("rollout_status") or ""),
497+
active_worker_count=int(data.get("active_worker_count") or 0),
498+
draining_worker_count=int(data.get("draining_worker_count") or 0),
499+
stale_worker_count=int(data.get("stale_worker_count") or 0),
500+
total_worker_count=int(data.get("total_worker_count") or 0),
501+
runtimes=[r for r in runtimes if isinstance(r, str)] if isinstance(runtimes, list) else [],
502+
sdk_versions=[v for v in sdk_versions if isinstance(v, str)] if isinstance(sdk_versions, list) else [],
503+
last_heartbeat_at=data.get("last_heartbeat_at"),
504+
first_seen_at=data.get("first_seen_at"),
505+
raw=data,
506+
)
507+
508+
509+
@dataclass
510+
class TaskQueueBuildIdRollout:
511+
"""Build-id rollout snapshot returned by the server for one task queue."""
512+
513+
namespace: str | None
514+
task_queue: str
515+
stale_after_seconds: int | None
516+
build_ids: list[TaskQueueBuildIdCohort]
517+
518+
@classmethod
519+
def from_dict(cls, data: dict[str, Any]) -> TaskQueueBuildIdRollout:
520+
items = data.get("build_ids")
521+
return cls(
522+
namespace=data.get("namespace"),
523+
task_queue=str(data.get("task_queue") or ""),
524+
stale_after_seconds=(
525+
int(data["stale_after_seconds"])
526+
if isinstance(data.get("stale_after_seconds"), int)
527+
else None
528+
),
529+
build_ids=[
530+
TaskQueueBuildIdCohort.from_dict(item)
531+
for item in (items if isinstance(items, list) else [])
532+
if isinstance(item, dict)
533+
],
534+
)
535+
536+
470537
@dataclass
471538
class WorkerDescription:
472539
"""Current server view of one registered worker."""
@@ -1303,6 +1370,28 @@ async def describe_task_queue(self, name: str) -> TaskQueueDescription:
13031370
)
13041371
return TaskQueueDescription.from_dict(data)
13051372

1373+
async def list_task_queue_build_ids(self, task_queue: str) -> TaskQueueBuildIdRollout:
1374+
"""Return the build-id rollout snapshot for ``task_queue``.
1375+
1376+
Use this before draining or removing an older build to confirm which
1377+
build cohorts can still claim work on the queue. Unversioned workers
1378+
are grouped under a cohort whose ``build_id`` is ``None``.
1379+
"""
1380+
data = await self._request(
1381+
"GET",
1382+
f"/task-queues/{quote(task_queue, safe='')}/build-ids",
1383+
context=task_queue,
1384+
)
1385+
if not isinstance(data, dict):
1386+
raise ServerError(
1387+
200,
1388+
{
1389+
"reason": "invalid_task_queue_build_ids_response",
1390+
"message": f"expected JSON object, got {type(data).__name__}",
1391+
},
1392+
)
1393+
return TaskQueueBuildIdRollout.from_dict(data)
1394+
13061395
# ── Search attributes ─────────────────────────────────────────────
13071396
async def list_search_attributes(self) -> SearchAttributeList:
13081397
"""List system and custom search attribute definitions for this namespace."""

src/durable_workflow/sync.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
ScheduleSpec,
1818
ScheduleTriggerResult,
1919
StorageTestResult,
20+
TaskQueueBuildIdRollout,
2021
TaskQueueDescription,
2122
TaskQueueList,
2223
WorkflowCommandResult,
@@ -284,6 +285,12 @@ def describe_task_queue(self, name: str) -> TaskQueueDescription:
284285
result: TaskQueueDescription = _run(self._async.describe_task_queue(name))
285286
return result
286287

288+
def list_task_queue_build_ids(self, task_queue: str) -> TaskQueueBuildIdRollout:
289+
result: TaskQueueBuildIdRollout = _run(
290+
self._async.list_task_queue_build_ids(task_queue)
291+
)
292+
return result
293+
287294
def list_namespaces(self) -> NamespaceList:
288295
result: NamespaceList = _run(self._async.list_namespaces())
289296
return result
Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
{
2+
"schema": "durable-workflow.polyglot.control-plane-request-fixture",
3+
"version": 1,
4+
"operation": "task_queue.build_ids",
5+
"request": {
6+
"method": "GET",
7+
"path": "/task-queues/orders-critical/build-ids"
8+
},
9+
"semantic_body": {
10+
"namespace": "orders-prod",
11+
"task_queue": "orders-critical",
12+
"build_ids": [
13+
"build-2026.04.22-a1",
14+
"build-2026.04.21-z9",
15+
null
16+
],
17+
"rollout_statuses": {
18+
"build-2026.04.22-a1": "active",
19+
"build-2026.04.21-z9": "draining",
20+
"unversioned": "stale_only"
21+
}
22+
},
23+
"response_body": {
24+
"namespace": "orders-prod",
25+
"task_queue": "orders-critical",
26+
"stale_after_seconds": 60,
27+
"build_ids": [
28+
{
29+
"build_id": "build-2026.04.22-a1",
30+
"rollout_status": "active",
31+
"active_worker_count": 2,
32+
"draining_worker_count": 0,
33+
"stale_worker_count": 0,
34+
"total_worker_count": 2,
35+
"runtimes": [
36+
"worker-runtime"
37+
],
38+
"sdk_versions": [
39+
"polyglot-sdk/2.0.0"
40+
],
41+
"last_heartbeat_at": "2026-04-22T09:30:00Z",
42+
"first_seen_at": "2026-04-22T08:00:00Z"
43+
},
44+
{
45+
"build_id": "build-2026.04.21-z9",
46+
"rollout_status": "draining",
47+
"active_worker_count": 0,
48+
"draining_worker_count": 1,
49+
"stale_worker_count": 0,
50+
"total_worker_count": 1,
51+
"runtimes": [
52+
"worker-runtime"
53+
],
54+
"sdk_versions": [
55+
"polyglot-sdk/1.9.8"
56+
],
57+
"last_heartbeat_at": "2026-04-22T09:29:00Z",
58+
"first_seen_at": "2026-04-21T10:00:00Z"
59+
},
60+
{
61+
"build_id": null,
62+
"rollout_status": "stale_only",
63+
"active_worker_count": 0,
64+
"draining_worker_count": 0,
65+
"stale_worker_count": 1,
66+
"total_worker_count": 1,
67+
"runtimes": [
68+
"worker-runtime"
69+
],
70+
"sdk_versions": [
71+
"polyglot-sdk/1.0.0"
72+
],
73+
"last_heartbeat_at": "2026-04-20T12:00:00Z",
74+
"first_seen_at": "2026-04-20T11:00:00Z"
75+
}
76+
]
77+
},
78+
"cli": {
79+
"argv": {
80+
"task-queue": "orders-critical",
81+
"--json": true
82+
}
83+
},
84+
"sdk_python": {
85+
"method": "list_task_queue_build_ids",
86+
"args": {
87+
"task_queue": "orders-critical"
88+
}
89+
}
90+
}

tests/test_client.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1014,6 +1014,103 @@ async def test_describe_task_queue_matches_polyglot_fixture(self, client: Client
10141014
assert result.admission.query_tasks.status == semantic["query_admission_status"]
10151015
assert [lease["task_id"] for lease in result.current_leases or []] == semantic["current_lease_ids"]
10161016

1017+
@pytest.mark.asyncio
1018+
async def test_list_task_queue_build_ids_matches_polyglot_fixture(self, client: Client) -> None:
1019+
fixture_path = (
1020+
Path(__file__).parent
1021+
/ "fixtures"
1022+
/ "control-plane"
1023+
/ "task-queue-build-ids-parity.json"
1024+
)
1025+
fixture = json.loads(fixture_path.read_text())
1026+
assert fixture["operation"] == "task_queue.build_ids"
1027+
sdk = fixture["sdk_python"]
1028+
resp = _mock_response(200, fixture["response_body"])
1029+
1030+
with patch.object(
1031+
client._http, "request", new_callable=AsyncMock, return_value=resp
1032+
) as mock:
1033+
result = await client.list_task_queue_build_ids(**sdk["args"])
1034+
1035+
assert mock.call_args.args[0] == fixture["request"]["method"]
1036+
assert mock.call_args.args[1] == f"/api{fixture['request']['path']}"
1037+
1038+
semantic = fixture["semantic_body"]
1039+
assert result.namespace == semantic["namespace"]
1040+
assert result.task_queue == semantic["task_queue"]
1041+
assert result.stale_after_seconds == fixture["response_body"]["stale_after_seconds"]
1042+
assert [cohort.build_id for cohort in result.build_ids] == semantic["build_ids"]
1043+
1044+
rollout_statuses = {
1045+
(cohort.build_id if cohort.build_id is not None else "unversioned"): cohort.rollout_status
1046+
for cohort in result.build_ids
1047+
}
1048+
assert rollout_statuses == semantic["rollout_statuses"]
1049+
1050+
@pytest.mark.asyncio
1051+
async def test_list_task_queue_build_ids_surfaces_cohort_worker_counts(
1052+
self, client: Client
1053+
) -> None:
1054+
resp = _mock_response(
1055+
200,
1056+
{
1057+
"namespace": "default",
1058+
"task_queue": "orders",
1059+
"stale_after_seconds": 90,
1060+
"build_ids": [
1061+
{
1062+
"build_id": "build-alpha",
1063+
"rollout_status": "active_with_draining",
1064+
"active_worker_count": 2,
1065+
"draining_worker_count": 1,
1066+
"stale_worker_count": 0,
1067+
"total_worker_count": 3,
1068+
"runtimes": ["worker-runtime"],
1069+
"sdk_versions": ["polyglot-sdk/2.0.0"],
1070+
"last_heartbeat_at": "2026-04-22T09:30:00Z",
1071+
"first_seen_at": "2026-04-22T08:00:00Z",
1072+
},
1073+
{
1074+
"build_id": None,
1075+
"rollout_status": "stale_only",
1076+
"active_worker_count": 0,
1077+
"draining_worker_count": 0,
1078+
"stale_worker_count": 1,
1079+
"total_worker_count": 1,
1080+
"runtimes": [],
1081+
"sdk_versions": [],
1082+
"last_heartbeat_at": None,
1083+
"first_seen_at": None,
1084+
},
1085+
],
1086+
},
1087+
)
1088+
1089+
with patch.object(
1090+
client._http, "request", new_callable=AsyncMock, return_value=resp
1091+
) as mock:
1092+
result = await client.list_task_queue_build_ids("orders")
1093+
1094+
assert mock.call_args.args[0] == "GET"
1095+
assert mock.call_args.args[1] == "/api/task-queues/orders/build-ids"
1096+
assert result.task_queue == "orders"
1097+
assert result.stale_after_seconds == 90
1098+
assert len(result.build_ids) == 2
1099+
1100+
alpha, unversioned = result.build_ids
1101+
assert alpha.build_id == "build-alpha"
1102+
assert alpha.rollout_status == "active_with_draining"
1103+
assert alpha.active_worker_count == 2
1104+
assert alpha.draining_worker_count == 1
1105+
assert alpha.total_worker_count == 3
1106+
assert alpha.runtimes == ["worker-runtime"]
1107+
assert alpha.sdk_versions == ["polyglot-sdk/2.0.0"]
1108+
assert unversioned.build_id is None
1109+
assert unversioned.rollout_status == "stale_only"
1110+
assert unversioned.stale_worker_count == 1
1111+
assert unversioned.last_heartbeat_at is None
1112+
assert unversioned.first_seen_at is None
1113+
10171114
@pytest.mark.asyncio
10181115
async def test_list_task_queues_parses_admission(self, client: Client) -> None:
10191116
resp = _mock_response(200, {

tests/test_sync.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,43 @@ def test_describe_task_queue(self) -> None:
161161
assert result.admission.activity_tasks.status == "no_slots"
162162
assert mock.call_args.args[:2] == ("GET", "/api/task-queues/orders%2Fhigh%20priority")
163163

164+
def test_list_task_queue_build_ids(self) -> None:
165+
client = Client("http://localhost:8080")
166+
resp = _mock_response(
167+
200,
168+
{
169+
"namespace": "default",
170+
"task_queue": "orders",
171+
"stale_after_seconds": 60,
172+
"build_ids": [
173+
{
174+
"build_id": "build-alpha",
175+
"rollout_status": "active",
176+
"active_worker_count": 2,
177+
"draining_worker_count": 0,
178+
"stale_worker_count": 0,
179+
"total_worker_count": 2,
180+
"runtimes": ["worker-runtime"],
181+
"sdk_versions": ["polyglot-sdk/2.0.0"],
182+
"last_heartbeat_at": "2026-04-22T09:30:00Z",
183+
"first_seen_at": "2026-04-22T08:00:00Z",
184+
}
185+
],
186+
},
187+
)
188+
with patch.object(
189+
client._async._http, "request", new_callable=AsyncMock, return_value=resp
190+
) as mock:
191+
result = client.list_task_queue_build_ids("orders")
192+
assert result.task_queue == "orders"
193+
assert result.stale_after_seconds == 60
194+
assert len(result.build_ids) == 1
195+
cohort = result.build_ids[0]
196+
assert cohort.build_id == "build-alpha"
197+
assert cohort.rollout_status == "active"
198+
assert cohort.total_worker_count == 2
199+
assert mock.call_args.args[:2] == ("GET", "/api/task-queues/orders/build-ids")
200+
164201

165202
class TestSyncClientRunVisibility:
166203
def test_list_workflow_runs(self) -> None:

0 commit comments

Comments
 (0)