Skip to content

Commit 3717d80

Browse files
committed
[zephyr] Drop dead per-worker sidecar cache
Each shard runs in its own subprocess, so a reducer calls from_sidecars once — the module-level cache never gets a hit in production. Inline the read into _read_sidecars_parallel and leave a TODO for a proper worker-level cache if we ever colocate reducers.
1 parent d73d608 commit 3717d80

1 file changed

Lines changed: 8 additions & 13 deletions

File tree

lib/zephyr/src/zephyr/shuffle.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -120,27 +120,22 @@ def _write_scatter_meta(data_path: str, sidecar: dict) -> None:
120120
f.write(payload)
121121

122122

123-
# Per-worker cache for sidecar reads.
124-
_scatter_meta_cache: dict[str, dict] = {}
125-
126-
127-
def _read_scatter_meta(data_path: str) -> dict:
128-
meta_path = _scatter_meta_path(data_path)
129-
if meta_path not in _scatter_meta_cache:
130-
with open_url(meta_path, "r") as f:
131-
_scatter_meta_cache[meta_path] = json.loads(f.read())
132-
return _scatter_meta_cache[meta_path]
133-
134-
135123
def _read_sidecars_parallel(scatter_paths: list[str]) -> list[tuple[str, dict]]:
136124
"""Read every ``.scatter_meta`` sidecar concurrently, preserving input order.
137125
138126
Each reducer calls this to build its ``ScatterReader`` directly from the
139127
per-mapper sidecars, without going through a coordinator-written manifest.
128+
129+
TODO(rav): each reducer subprocess re-reads every sidecar even though only
130+
one shard's byte ranges are used. A worker-level sidecar cache (or a shared
131+
read across colocated reducers) would avoid the redundant GCS GETs when
132+
many reducers run on the same host.
140133
"""
141134

142135
def _read_entry(path: str) -> tuple[str, dict]:
143-
return path, _read_scatter_meta(path)
136+
meta_path = _scatter_meta_path(path)
137+
with open_url(meta_path, "r") as f:
138+
return path, json.loads(f.read())
144139

145140
results: dict[str, dict] = {}
146141
with concurrent.futures.ThreadPoolExecutor(max_workers=_SIDECAR_READ_CONCURRENCY) as pool:

0 commit comments

Comments
 (0)