Skip to content

Create PackedData from cudf PackedColumns #228

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 18 commits into
base: branch-25.06
Choose a base branch
from

Conversation

nirandaperera
Copy link
Contributor

Current PackedData cython API does not contain a constructor that takes in cudf packed columns. This PR adds it.

@nirandaperera nirandaperera requested a review from a team as a code owner April 24, 2025 23:41
@nirandaperera nirandaperera added improvement Improves an existing functionality non-breaking Introduces a non-breaking change labels Apr 24, 2025
@nirandaperera nirandaperera requested a review from wence- April 24, 2025 23:42
Comment on lines 19 to 22
def __init__(self, PackedColumns packed_columns) -> None:
self.c_obj = make_unique[cpp_PackedData](
move(deref(packed_columns.c_obj).metadata),
move(deref(packed_columns.c_obj).gpu_data))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to very clearly document that it takes ownership of the data in the passed in PackedColumns object.

@nirandaperera nirandaperera requested a review from wence- April 25, 2025 15:30
@nirandaperera
Copy link
Contributor Author

/ok to test

Comment on lines +31 to +32
move(deref(packed_columns.c_obj).metadata),
move(deref(packed_columns.c_obj).gpu_data))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use packed_columns.release(), which raises if packed_columns has already been released.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we discussed offline, packed_columns.release might not work here and the empty check is added here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but why might it not work? Should we add a comment why then?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

packed_columns.release returns a tuple[memoryview, gpumemoryview]. There are no API endpoints to convert these views to vector and rmm device buffer.

@nirandaperera nirandaperera requested a review from madsbk April 28, 2025 20:13
Copy link
Member

@madsbk madsbk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, only some minor suggestions.

@@ -13,3 +15,39 @@ cdef class PackedData:
cdef PackedData self = PackedData.__new__(PackedData)
self.c_obj = move(obj)
return self

@staticmethod
def from_cudf_packed_columns(PackedColumns packed_columns) -> PackedData:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No type notation in pyx

Suggested change
def from_cudf_packed_columns(PackedColumns packed_columns) -> PackedData:
def from_cudf_packed_columns(PackedColumns packed_columns):

@@ -13,3 +15,39 @@ cdef class PackedData:
cdef PackedData self = PackedData.__new__(PackedData)
self.c_obj = move(obj)
return self

@staticmethod
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nirandaperera nirandaperera requested a review from madsbk May 9, 2025 17:18
Copy link
Member

@pentschev pentschev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @nirandaperera , left a few comments.

Comment on lines +31 to +32
move(deref(packed_columns.c_obj).metadata),
move(deref(packed_columns.c_obj).gpu_data))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but why might it not work? Should we add a comment why then?

class PackedData:
pass
def __init__(self) -> None: ...
@staticmethod
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be @classmethod like in the pyx file?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, good catch. Thanks @pentschev

Comment on lines -157 to +184
try:
consumer_thread.start()

n_parts_local = local_size // part_size
# start the consumer thread. This will wait for any finished partition.
consumer_thread.start()

# simulate a hash partition by splitting a partition into total_num_partitions
split_size = part_size // output_nparts
dummy_table, _ = generate_partition(split_size).to_pylibcudf()
n_parts_local = local_size // part_size

comm.logger.print(
f"num local partitions:{n_parts_local} split size:{split_size}"
)
for p in range(n_parts_local):
# generate chunks for a single local partition by deep copying the dummy table as packed columns
# NOTE: This would require part_size amount of GPU memory.
chunks: dict[int, PackedColumns] = {}
for i in range(output_nparts):
chunks[i] = pack(dummy_table)
# simulate a hash partition by splitting a partition into total_num_partitions
split_size = part_size // output_nparts
dummy_table, _ = generate_partition(split_size).to_pylibcudf()

if p > 0 and insert_delay_ms > 0:
time.sleep(insert_delay_ms / 1000)
comm.logger.print(f"num local partitions:{n_parts_local} split size:{split_size}")
for p in range(n_parts_local):
# generate chunks for a single local partition by deep copying the dummy table
# as packed columns
# NOTE: This will require part_size amount of GPU memory.
chunks: dict[int, PackedData] = {}
for i in range(output_nparts):
chunks[i] = PackedData.from_cudf_packed_columns(pack(dummy_table))

shuffler.insert_chunks(chunks)
if p > 0 and insert_delay_ms > 0:
time.sleep(insert_delay_ms / 1000)

# finish inserting all partitions
for i in range(output_nparts):
shuffler.insert_finished(i)
shuffler.insert_chunks(chunks)
# finish inserting all partitions
for i in range(output_nparts):
shuffler.insert_finished(i)

finally:
# wait for all partitions to be consumed
consumer_thread.join(timeout=wait_timeout)
# wait for the consumer thread to finish.
consumer_thread.join(timeout=wait_timeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What a horrible diff rendering!

nirandaperera and others added 3 commits May 9, 2025 14:02
Co-authored-by: Peter Andreas Entschev <[email protected]>
Signed-off-by: niranda perera <[email protected]>
…ra/rapidsmpf into packed_data_from_packed_cols
@nirandaperera nirandaperera requested a review from pentschev May 9, 2025 21:51
Signed-off-by: niranda perera <[email protected]>
Copy link
Member

@pentschev pentschev left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks @nirandaperera !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
improvement Improves an existing functionality non-breaking Introduces a non-breaking change
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants