Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 14 additions & 3 deletions docs/source/advanced/epochized_blending.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,24 @@ splits:
split_part: val
```

Now, the call to `get_train_dataset` requires the additional parameter `repeat=False` to interrupt iterating after one epoch:
Now, the call to `get_train_dataset` requires the additional parameter `repeat=False` to interrupt iterating after one epoch.
In addition, the `RedistributeLoader` is used to synchronize the last samples when some ranks have already exhausted their data:

```py
from megatron.energon import get_train_dataset, get_loader, WorkerConfig
from megatron.energon.sync_end import RedistributeLoader


loader = get_loader(get_train_dataset(
# Set repeat=False to avoid repeating the dataset.
# Use RedistributeLoader to synchronize the end of rank exhaustion. Only works with initialized torch distributed.
loader = RedistributeLoader(get_loader(get_train_dataset(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better if this was transparent to the user, so make get_loader handle this internally.
And since the choice of RedistributeLoader vs. StopFirstLoader actually changes the data that's being iterated, this choice should be made in the metadataset and not in the code, I think.
As a property of blend_epochized.

I.e. blend_epochized can either be a list as before (chooses default RedistributeLoader), or it can be a dict for more customization like

    blend_epochized:
      phase_out_behavior: stop_first_loader
      datasets:
        - repetitions: 5
            path: ./coco
            # ... Other parameters
        - repetitions: 2
            path: ./coyo
        - repetitions: 1
            path: ./coyo
            split_part: val

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, we can make this be handled in get_loader. I personally would prefer it to be separate though, as it's a feature on top?

Regarding moving the configuration to the metadataset:
I see your point that this slightly modifies how the data is iterated, but I'd also argue:

  1. So far we don't really rely on torch distributed, where this piece of code is tightly bound to it.
  2. This would also disable nesting of blend_epochized, because you cannot nest different (or unconfigured) phase_out_behavior.
  3. This depends on repeat=False and doesn't make sense if repeat=True, so it's based on what the user sets in the code.
  4. At least for RedistributeLoader it should not really change the data frequency (so far the settings in the metadataset mainly focuses on data frequency / blend).
  5. If we move the boundary of metadataset to include this, then we should also have gradient accumulation, seeds, batch size, handling of incomplete batches, etc. in the config. I wouldn't want that, tbh.

Thus voting for keeping this in code, not in the metadataset config.

'metadataset.yaml',
batch_size=2,
shuffle_buffer_size=100,
max_samples_per_sequence=100,
worker_config=WorkerConfig.default_worker_config(),
repeat=False,
))
)))

# This will now stop iterating after the datasets have been iterated (coco 5 times, coyo-train 2
# times and coyo-val 1 times). Of course, the data is still being shuffled between all those
Expand All @@ -54,3 +58,10 @@ for batch in loader:

If used as dataset for `get_val_dataset`, the `repetitions` are ignored.
The metadataset would also work without setting `repeat=False`, but then the shuffle buffer will shuffle samples across bounderies of epochs.

## Sample Redistribution

There are two options for handling the end of the data iteration:

* `RedistributeLoader`: Redistributes samples when a rank is exhausted before other ranks. On next epoch, includes the incomplete batches' samples.
* `StopFirstLoader`: Stop as soon as the first rank is exhausted. The next epoch will iterate until the next loader stops, restarting all ranks once.
21 changes: 21 additions & 0 deletions src/megatron/energon/sync_end/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Copyright (c) 2025, NVIDIA CORPORATION.
# SPDX-License-Identifier: BSD-3-Clause
from megatron.energon.sync_end.redistribute import RedistributeDataLoaderState, RedistributeLoader
from megatron.energon.sync_end.stop_first_end import StopFirstDataLoaderState, StopFirstLoader

"""
Provides wrappers for the dataset loaders that allow for synchronization at the end of the dataset.
I.e. if running a training with repeat=False, the loaders will typically exhaust at different times, which may require
synchronization across ranks.

The wrappers are:
- RedistributeLoader: Redistributes the last samples to the ranks that are not exhausted.
- StopFirstLoader: Stops iterating as soon as the first rank is exhausted.
"""

__all__ = [
"RedistributeLoader",
"RedistributeDataLoaderState",
"StopFirstLoader",
"StopFirstDataLoaderState",
]
Loading