Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Uniform uniform distribute_buffer_sizes for calculating buff size #94

Closed
wants to merge 3 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 2 additions & 43 deletions distributed_shampoo/utils/shampoo_ddp_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@

"""

import heapq
import logging
import operator
from functools import partial
from itertools import islice
from typing import Any
Expand All @@ -26,6 +24,7 @@
from distributed_shampoo.utils.shampoo_distributor import DistributorInterface
from distributed_shampoo.utils.shampoo_utils import (
compress_list,
distribute_buffer_sizes,
generate_pairwise_indices,
get_dtype_size,
)
Expand Down Expand Up @@ -221,47 +220,7 @@ def _distribute_buffer_sizes(
-> buffer_size_ranks = [(128, 1), (64, 1), (512, 0), (256, 1)]

"""

ALIGNMENT_BYTES = (
64 # necessary for determining buffer size, possibly hardware-dependent
)

# Convert each of buffer_sizes into smallest multiple of ALIGNMENT_BYTES that is >= buffer size.
aligned_buffer_sizes = [
(buffer_size + ALIGNMENT_BYTES - 1) // ALIGNMENT_BYTES * ALIGNMENT_BYTES
for buffer_size in buffer_sizes
]
buffer_size_ranks = [(-1, -1)] * len(buffer_sizes)
allocated_buffer_sizes = [
(0, group_index) for group_index in range(self._group_size)
]
heapq.heapify(allocated_buffer_sizes)

for index, aligned_buffer_size in sorted(
enumerate(aligned_buffer_sizes),
key=operator.itemgetter(1),
reverse=True,
):
# Greedily find the group with the least allocated buffer size and its group index
# in order to allocate buffers on that group.
(
min_allocated_buffer_size,
min_allocated_buffer_size_group_index,
) = heapq.heappop(allocated_buffer_sizes)

heapq.heappush(
allocated_buffer_sizes,
(
min_allocated_buffer_size + aligned_buffer_size,
min_allocated_buffer_size_group_index,
),
)
buffer_size_ranks[index] = (
aligned_buffer_size,
min_allocated_buffer_size_group_index,
)

return tuple(buffer_size_ranks)
return distribute_buffer_sizes(buffer_sizes, self._group_size)

@torch.no_grad()
def _construct_global_block_info_list(
Expand Down
44 changes: 2 additions & 42 deletions distributed_shampoo/utils/shampoo_hsdp_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@

"""

import heapq
import logging
import operator
from functools import partial
from itertools import islice
from math import prod
Expand All @@ -29,6 +27,7 @@
from distributed_shampoo.utils.shampoo_distributor import DistributorInterface
from distributed_shampoo.utils.shampoo_utils import (
compress_list,
distribute_buffer_sizes,
generate_pairwise_indices,
get_dtype_size,
merge_small_dims,
Expand Down Expand Up @@ -315,46 +314,7 @@ def _distribute_buffer_sizes(
-> buffer_size_ranks = [(128, 1), (64, 1), (512, 0), (256, 1)]

"""
ALIGNMENT_BYTES = (
64 # necessary for determining buffer size, possibly hardware-dependent
)

# Convert each of buffer_sizes into smallest multiple of ALIGNMENT_BYTES that is >= buffer size.
aligned_buffer_sizes = [
(buffer_size + ALIGNMENT_BYTES - 1) // ALIGNMENT_BYTES * ALIGNMENT_BYTES
for buffer_size in buffer_sizes
]
buffer_size_ranks = [(-1, -1)] * len(buffer_sizes)
allocated_buffer_sizes = [
(0, group_index) for group_index in range(self._dist_group_size)
]
heapq.heapify(allocated_buffer_sizes)

for index, aligned_buffer_size in sorted(
enumerate(aligned_buffer_sizes),
key=operator.itemgetter(1),
reverse=True,
):
# Greedily find the group with the least allocated buffer size and its group index
# in order to allocate buffers on that group.
(
min_allocated_buffer_size,
min_allocated_buffer_size_group_index,
) = heapq.heappop(allocated_buffer_sizes)

heapq.heappush(
allocated_buffer_sizes,
(
min_allocated_buffer_size + aligned_buffer_size,
min_allocated_buffer_size_group_index,
),
)
buffer_size_ranks[index] = (
aligned_buffer_size,
min_allocated_buffer_size_group_index,
)

return tuple(buffer_size_ranks)
return distribute_buffer_sizes(buffer_sizes, self._dist_group_size)

def _construct_composable_block_ids(
self,
Expand Down
45 changes: 2 additions & 43 deletions distributed_shampoo/utils/shampoo_hybrid_shard_distributor.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,7 @@

"""

import heapq
import logging
import operator
from functools import partial
from itertools import islice
from typing import Any, Iterable
Expand All @@ -25,6 +23,7 @@
from distributed_shampoo.utils.shampoo_distributor import DistributorInterface
from distributed_shampoo.utils.shampoo_utils import (
compress_list,
distribute_buffer_sizes,
generate_pairwise_indices,
get_dtype_size,
)
Expand Down Expand Up @@ -317,48 +316,8 @@ def _distribute_buffer_sizes(
Example:
Assuming ALIGNMENT_BYTES = 64, given buffer_sizes = [128, 64, 500, 256], group_size = 2
-> buffer_size_ranks = [(128, 1), (64, 1), (512, 0), (256, 1)]

"""
ALIGNMENT_BYTES = (
64 # necessary for determining buffer size, possibly hardware-dependent
)

# Convert each of buffer_sizes into smallest multiple of ALIGNMENT_BYTES that is >= buffer size.
aligned_buffer_sizes = [
(buffer_size + ALIGNMENT_BYTES - 1) // ALIGNMENT_BYTES * ALIGNMENT_BYTES
for buffer_size in buffer_sizes
]
buffer_size_ranks = [(-1, -1)] * len(buffer_sizes)
allocated_buffer_sizes = [
(0, group_index) for group_index in range(self._dist_group_size)
]
heapq.heapify(allocated_buffer_sizes)

for index, aligned_buffer_size in sorted(
enumerate(aligned_buffer_sizes),
key=operator.itemgetter(1),
reverse=True,
):
# Greedily find the group with the least allocated buffer size and its group index
# in order to allocate buffers on that group.
(
min_allocated_buffer_size,
min_allocated_buffer_size_group_index,
) = heapq.heappop(allocated_buffer_sizes)

heapq.heappush(
allocated_buffer_sizes,
(
min_allocated_buffer_size + aligned_buffer_size,
min_allocated_buffer_size_group_index,
),
)
buffer_size_ranks[index] = (
aligned_buffer_size,
min_allocated_buffer_size_group_index,
)

return tuple(buffer_size_ranks)
return distribute_buffer_sizes(buffer_sizes, self._dist_group_size)

def _construct_composable_block_ids(
self,
Expand Down
68 changes: 68 additions & 0 deletions distributed_shampoo/utils/shampoo_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@

"""

import heapq
import math
import operator
from collections.abc import Callable, Iterator, Sequence
from functools import partial, reduce
from itertools import accumulate, chain, compress, pairwise
Expand Down Expand Up @@ -151,3 +153,69 @@ def __exit__(
exc_tb: TracebackType | None,
) -> None:
self._exit_method()


def distribute_buffer_sizes(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We require 100% test coverage, and this function is not tested if we moved to here. https://github.com/facebookresearch/optimizers/blob/main/distributed_shampoo/utils/tests/shampoo_utils_test.py is the file you could add your tests.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then may I add some test module in shampoo_utils_test.py for testing distribute_buffer_sizes?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then may I add some test module in shampoo_utils_test.py for testing distribute_buffer_sizes?

Yes, because distribute_buffer_sizes() is a public function now used for others; as a result, we want to test it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your review. I fixed buffer_size_rank, and builded testing module in shampoo_utuls.py. Could you review it??

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Codes look good to me, and could you fix the type errors listed in https://github.com/facebookresearch/optimizers/actions/runs/14051552340/job/39345812119?pr=94? You might want to rebase your repo first.

Copy link
Contributor Author

@namgyu-youn namgyu-youn Mar 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

type-check.yml can't independently work because PATH is not configured.

@namgyu-youn, could you elaborate on this?

I tested GitHub Actions using act. The commander is: act -j mypy -w .github/workflows/type-check.yml

But it failed because act failed to find the local path of ⁣uv. So I added some PATH for forced configuration in type-check.yml, and it worked successfully.

But I am not certain about this change because:

  • Contributors could configure the local path themselves.
  • Specified PATH only works in ubuntu (not macOS or Windows)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks. This appears to be a compatibility issue with act since the workflow does work remotely. I don't know act enough to know how to best resolve this, but I think it shouldn't be necessary to explicitly specify PATH.

The recommended workflow is to use the make commands locally before committing, i.e. in this case make type-check (this assumes you installed via a variation of pip install -e ".[dev]", see here).

Copy link
Contributor Author

@namgyu-youn namgyu-youn Mar 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see, thanks. This appears to be a compatibility issue with act since the workflow does work remotely. I don't know act enough to know how to best resolve this, but I think it shouldn't be necessary to explicitly specify PATH.

The recommended workflow is to use the make commands locally before committing, i.e. in this case make type-check (this assumes you installed via a variation of pip install -e ".[dev]", see here).

In my thought, two tools are useful in the following case:

  • make : It is specialized for maintainers (auto-build). (e.g., docker image release)
  • act : It is specialized for contributors (local test). (e.g., pre-commit, pytes )

So I want to carefully suggest this update would be helpful. But as you mentioned, CONTRIBUTING.md explicitly requires make, so I would follow your decision.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you want you can create a separate PR to ensure compatibility with act by refactoring the workflows since I won't prioritise this anytime soon.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. I will open PR after this. @tsunghsienlee could you approve and merge #96 ? I opened rebased PR for this.

buffer_sizes: tuple[int, ...],
group_size: int,
) -> tuple[tuple[int, int], ...]:
"""Distribute given buffer sizes across ranks in a group.

Buffer sizes will be rounded up for memory allocation. Buffers are distributed such that
total buffer sizes of each rank are as even as possible. This is currently performed
using a greedy algorithm. We do not currently consider computational cost
or kernel launching overheads.

Note: A better distribution strategy should try to minimize the delta of buffer sizes
between the most and the least allocated groups.

Args:
buffer_sizes (tuple[int, ...]): Buffer sizes of blocks to be distributed.
group_size (int): Number of groups to distribute across.

Returns:
buffer_size_ranks (tuple[tuple[int, int], ...]): A list of tuples containing the
buffer size for each block and its assigned rank.

Example:
Assuming ALIGNMENT_BYTES = 64, given buffer_sizes = [128, 64, 500, 256], group_size = 2
-> buffer_size_ranks = [(128, 1), (64, 1), (512, 0), (256, 1)]
"""
ALIGNMENT_BYTES = (
64 # necessary for determining buffer size, possibly hardware-dependent
)

# Convert each of buffer_sizes into smallest multiple of ALIGNMENT_BYTES that is >= buffer size.
aligned_buffer_sizes = [
(buffer_size + ALIGNMENT_BYTES - 1) // ALIGNMENT_BYTES * ALIGNMENT_BYTES
for buffer_size in buffer_sizes
]
buffer_size_ranks = [(-1, -1)] * len(buffer_sizes)
allocated_buffer_sizes = [(0, group_index) for group_index in range(group_size)]
heapq.heapify(allocated_buffer_sizes)

for index, aligned_buffer_size in sorted(
enumerate(aligned_buffer_sizes),
key=operator.itemgetter(1),
reverse=True,
):
# Greedily find the group with the least allocated buffer size and its group index
# in order to allocate buffers on that group.
(
min_allocated_buffer_size,
min_allocated_buffer_size_group_index,
) = heapq.heappop(allocated_buffer_sizes)

heapq.heappush(
allocated_buffer_sizes,
(
min_allocated_buffer_size + aligned_buffer_size,
min_allocated_buffer_size_group_index,
),
)
buffer_size_ranks[index] = (
aligned_buffer_size,
min_allocated_buffer_size_group_index,
)

return tuple(buffer_size_ranks)