[zephyr] Honor num_output_shards when input has more shards#5166
Merged
ravwojdyla merged 2 commits intomainfrom Apr 25, 2026
Merged
[zephyr] Honor num_output_shards when input has more shards#5166ravwojdyla merged 2 commits intomainfrom
ravwojdyla merged 2 commits intomainfrom
Conversation
Scatter routes records into exactly num_output_shards buckets via hash(key) % num_output_shards. The reduce stage previously spawned max(input_shards, num_output_shards) tasks; with M input shards and N output shards where M > N, the extra M-N reduce tasks ran on a shard_idx no record hashes to and emitted empty output files. For marin#5162: coderforge normalize had 672 input files and a configured 29 output shards, producing 643 empty parquet shards alongside 29 real ones.
rjpower
approved these changes
Apr 25, 2026
| else: | ||
| num_output = max(max(result_refs.keys(), default=0) + 1, input_shard_count) | ||
| if output_shard_count is not None: | ||
| num_output = max(num_output, output_shard_count) |
Collaborator
There was a problem hiding this comment.
The behavior here is a little weird for non-scatter cases. It seems like for non-scatter we should just ignore output shards? If a user wants a different shard count we should be going through a reshard?
Contributor
There was a problem hiding this comment.
good point - that's cleaner and more direct - will update
stage.output_shards is only ever set on Scatter (via group_by) and Reshard stages, and Reshard short-circuits before reaching _regroup_result_refs. The non-scatter branch's max-with-output_shard_count was dead code suggesting semantics that don't exist; resharding belongs to ReshardOp.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Scatter routes records into exactly num_output_shards buckets via hash(key) % num_output_shards. The reduce stage previously spawned max(input_shards, num_output_shards) tasks; with M input shards and N output shards where M > N, the extra reduce tasks ran on a shard_idx no record hashes to and emitted empty output files. For coderforge normalize this produced 643 empty parquet shards alongside 29 real ones. Adds regression test.
Fixes #5162