Skip to content

Commit 0b14e61

Browse files
committed
zephyr: restore write_batch_size wiring + fix lint
Dropped during the previous rebase when the byte-budgeted write batch commit was collapsed — plan.py still needs to pass write_batch_size through to external_sort_merge. Also removes an unused TaskResources import in shuffle.py flagged by ruff.
1 parent 11a7980 commit 0b14e61

File tree

2 files changed

+10
-4
lines changed

2 files changed

+10
-4
lines changed

lib/zephyr/src/zephyr/plan.py

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -644,7 +644,7 @@ def merge_key(item):
644644
)
645645

646646
if use_external:
647-
from zephyr.external_sort import compute_fan_in
647+
from zephyr.external_sort import compute_fan_in, compute_write_batch_size
648648

649649
memory_limit = _TaskResources.from_environment().memory_bytes
650650
# Per-iterator memory ~= compressed bytes for one chunk held by
@@ -653,16 +653,23 @@ def merge_key(item):
653653
# mediocre zstd ratio.
654654
per_iter_bytes = int(shard.max_chunk_rows * shard.avg_item_bytes)
655655
fan_in = compute_fan_in(per_iter_bytes, memory_limit)
656+
write_batch_size = compute_write_batch_size(shard.avg_item_bytes)
656657
logger.info(
657-
"External sort triggered for shard with %d iterators, fan_in=%d (per_iter≈%dKB), spilling to %s",
658+
"External sort triggered for shard with %d iterators, "
659+
"fan_in=%d (per_iter≈%dKB), write_batch_size=%d, spilling to %s",
658660
sum(it.chunk_count for it in shard.iterators),
659661
fan_in,
660662
per_iter_bytes // 1024,
663+
write_batch_size,
661664
external_sort_dir,
662665
)
663666
# Pass lazy generator — external_sort_merge consumes in batches without opening all files
664667
merged_stream = external_sort_merge(
665-
shard.get_iterators(), merge_key, external_sort_dir, fan_in=fan_in
668+
shard.get_iterators(),
669+
merge_key,
670+
external_sort_dir,
671+
fan_in=fan_in,
672+
write_batch_size=write_batch_size,
666673
)
667674
else:
668675
chunk_iterators = list(shard.get_iterators())

lib/zephyr/src/zephyr/shuffle.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,6 @@
4343
from typing import Any
4444

4545
import zstandard as zstd
46-
from iris.env_resources import TaskResources as _TaskResources
4746
from rigging.filesystem import open_url, url_to_fs
4847
from rigging.timing import log_time
4948

0 commit comments

Comments
 (0)