Skip to content

Commit cb994f9

Browse files
committed
Clean tuples dict keys from workers_info in /api/v1/retire_workers.
1 parent fd3722d commit cb994f9

File tree

2 files changed

+52
-0
lines changed

2 files changed

+52
-0
lines changed

distributed/http/scheduler/api.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,18 @@ async def post(self):
2424
else:
2525
workers = params.get("workers", {})
2626
workers_info = await scheduler.retire_workers(workers=workers)
27+
28+
# digests_total_since_heartbeat contains tuples as keys which are not json serializable
29+
def clean_dict(d):
30+
if not isinstance(d, dict):
31+
return d
32+
return {
33+
k: clean_dict(v)
34+
for k, v in d.items()
35+
if not isinstance(k, tuple)
36+
}
37+
38+
workers_info = clean_dict(workers_info)
2739
self.write(json.dumps(workers_info))
2840
except Exception as e:
2941
self.set_status(500, str(e))

distributed/http/scheduler/tests/test_scheduler_http.py

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -720,6 +720,46 @@ async def test_retire_workers(c, s, a, b):
720720
assert len(retired_workers_info) == 2
721721

722722

723+
@gen_cluster(
724+
client=True,
725+
clean_kwargs={"threads": False},
726+
config={
727+
"distributed.scheduler.http.routes": DEFAULT_ROUTES
728+
+ ["distributed.http.scheduler.api"]
729+
},
730+
)
731+
async def test_retire_workers_with_tuple_keys(c, s, a, b):
732+
aiohttp = pytest.importorskip("aiohttp")
733+
734+
worker_address = "tcp://172.17.0.3:39571"
735+
async def mock_retire_workers(*args, **kwargs):
736+
# Return problematic data structure
737+
# tuple are not json serializable
738+
return {
739+
worker_address: {
740+
"type": "Worker",
741+
"metrics": {
742+
"digests_total_since_heartbeat": {
743+
("execute", "slowadd", "thread-cpu", "seconds"): 0.0003396660000000093,
744+
},
745+
}
746+
}}
747+
748+
# Replace the method with our mock
749+
s.retire_workers = mock_retire_workers
750+
751+
async with aiohttp.ClientSession() as session:
752+
params = {"workers": [a.address, b.address]}
753+
async with session.post(
754+
"http://localhost:%d/api/v1/retire_workers" % s.http_server.port,
755+
json=params,
756+
) as resp:
757+
assert resp.status == 200
758+
assert resp.headers["Content-Type"] == "application/json"
759+
retired_workers_info = json.loads(await resp.text())
760+
assert worker_address in retired_workers_info
761+
762+
723763
@gen_cluster(
724764
client=True,
725765
clean_kwargs={"threads": False},

0 commit comments

Comments
 (0)