Skip to content

Refactor Python Neighborhood Sample #4988

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 26 commits into
base: branch-25.06
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove deprecated API
jnke2016 committed Apr 2, 2025
commit 9e37c18bcefdbd12d63592b1f8e0f7827fe9a9c0
139 changes: 30 additions & 109 deletions python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py
Original file line number Diff line number Diff line change
@@ -76,16 +76,14 @@ def create_empty_df_with_edge_props(
weight_t,
return_offsets=False,
renumber=False,
use_legacy_names=True,
include_hop_column=True,
compression="COO",
):
if compression != "COO":
majors_name = "major_offsets"
else:
majors_name = src_n if use_legacy_names else "majors"
majors_name = "majors"

minors_name = dst_n if use_legacy_names else "minors"
minors_name = "minors"

if renumber:
empty_df_renumber = cudf.DataFrame(
@@ -106,8 +104,7 @@ def create_empty_df_with_edge_props(
}
)

if include_hop_column:
df[hop_id_n] = numpy.empty(shape=0, dtype="int32")
df[hop_id_n] = numpy.empty(shape=0, dtype="int32")

empty_df_offsets = cudf.DataFrame(
{
@@ -160,15 +157,12 @@ def _call_plc_uniform_neighbor_sample(
fanout_vals,
with_replacement,
weight_t,
with_edge_properties,
random_state=None,
return_offsets=False,
return_hops=True,
prior_sources_behavior=None,
deduplicate_sources=False,
renumber=False,
use_legacy_names=True,
include_hop_column=True,
compress_per_hop=False,
compression="COO",
):
@@ -193,7 +187,6 @@ def _call_plc_uniform_neighbor_sample(
h_fan_out=fanout_vals,
with_replacement=with_replacement,
do_expensive_check=False,
with_edge_properties=with_edge_properties,
batch_id_list=batch_id_list_x,
random_state=random_state,
prior_sources_behavior=prior_sources_behavior,
@@ -202,23 +195,19 @@ def _call_plc_uniform_neighbor_sample(
renumber=renumber,
compression=compression,
compress_per_hop=compress_per_hop,
return_dict=True,
)

# have to import here due to circular import issue
from cugraph.sampling.sampling_utilities import (
sampling_results_from_cupy_array_dict,
legacy_sampling_results_from_cupy_array_dict,
)

return sampling_results_from_cupy_array_dict(
return legacy_sampling_results_from_cupy_array_dict(
cupy_array_dict,
weight_t,
len(fanout_vals),
with_edge_properties=with_edge_properties,
return_offsets=return_offsets,
renumber=renumber,
use_legacy_names=use_legacy_names,
include_hop_column=include_hop_column,
)


@@ -234,15 +223,12 @@ def _mg_call_plc_uniform_neighbor_sample(
with_replacement,
weight_t,
indices_t,
with_edge_properties,
random_state,
return_offsets=False,
return_hops=True,
prior_sources_behavior=None,
deduplicate_sources=False,
renumber=False,
use_legacy_names=True,
include_hop_column=True,
compress_per_hop=False,
compression="COO",
):
@@ -268,16 +254,13 @@ def _mg_call_plc_uniform_neighbor_sample(
fanout_vals,
with_replacement,
weight_t=weight_t,
with_edge_properties=with_edge_properties,
# FIXME accept and properly transmute a numpy/cupy random state.
random_state=hash((random_state, w)),
return_offsets=return_offsets,
return_hops=return_hops,
prior_sources_behavior=prior_sources_behavior,
deduplicate_sources=deduplicate_sources,
renumber=renumber,
use_legacy_names=use_legacy_names, # remove in 23.12
include_hop_column=include_hop_column, # remove in 23.12
compress_per_hop=compress_per_hop,
compression=compression,
allow_other_workers=False,
@@ -293,12 +276,8 @@ def _mg_call_plc_uniform_neighbor_sample(
weight_t,
return_offsets=return_offsets,
renumber=renumber,
use_legacy_names=use_legacy_names,
compression=compression,
include_hop_column=include_hop_column,
)
if with_edge_properties
else create_empty_df(indices_t, weight_t)
)
if not isinstance(empty_df, (list, tuple)):
empty_df = [empty_df]
@@ -347,19 +326,16 @@ def uniform_neighbor_sample(
fanout_vals: List[int],
*,
with_replacement: bool = True,
with_edge_properties: bool = False, # deprecated
with_batch_ids: bool = False,
keep_batches_together=False,
min_batch_id=None,
max_batch_id=None,
random_state: int = None,
return_offsets: bool = False,
return_hops: bool = True,
include_hop_column: bool = True, # deprecated
prior_sources_behavior: str = None,
deduplicate_sources: bool = False,
renumber: bool = False,
use_legacy_names=True, # deprecated
compress_per_hop=False,
compression="COO",
_multiple_clients: bool = False,
@@ -384,11 +360,6 @@ def uniform_neighbor_sample(
with_replacement: bool, optional (default=True)
Flag to specify if the random sampling is done with replacement

with_edge_properties: bool, optional (default=False)
Deprecated.
Flag to specify whether to return edge properties (weight, edge id,
edge type, batch id, hop id) with the sampled edges.

with_batch_ids: bool, optional (default=False)
Flag to specify whether batch ids are present in the start_list

@@ -416,12 +387,6 @@ def uniform_neighbor_sample(
corresponding to the hop where the edge appeared.
Defaults to True.

include_hop_column: bool, optional (default=True)
Deprecated. Defaults to True.
If True, will include the hop column even if
return_offsets is True. This option will
be removed in release 23.12.

prior_sources_behavior: str (Optional)
Options are "carryover", and "exclude".
Default will leave the source list as-is.
@@ -440,13 +405,6 @@ def uniform_neighbor_sample(
will return the renumber map and renumber map offsets
as an additional dataframe.

use_legacy_names: bool, optional (default=True)
Whether to use the legacy column names (sources, destinations).
If True, will use "sources" and "destinations" as the column names.
If False, will use "majors" and "minors" as the column names.
Deprecated. Will be removed in release 23.12 in favor of always
using the new names "majors" and "minors".

compress_per_hop: bool, optional (default=False)
Whether to compress globally (default), or to produce a separate
compressed edgelist per hop.
@@ -464,44 +422,34 @@ def uniform_neighbor_sample(
result : dask_cudf.DataFrame or Tuple[dask_cudf.DataFrame, dask_cudf.DataFrame]
GPU distributed data frame containing several dask_cudf.Series

If with_edge_properties=True:
ddf['sources']: dask_cudf.Series
If return_offsets=False:
df['majors']: dask_cudf.Series
Contains the source vertices from the sampling result
ddf['destinations']: dask_cudf.Series
df['minors']: dask_cudf.Series
Contains the destination vertices from the sampling result
ddf['indices']: dask_cudf.Series
Contains the indices from the sampling result for path
reconstruction

If with_edge_properties=False:
If return_offsets=False:
df['sources']: dask_cudf.Series
Contains the source vertices from the sampling result
df['destinations']: dask_cudf.Series
Contains the destination vertices from the sampling result
df['edge_weight']: dask_cudf.Series
Contains the edge weights from the sampling result
df['edge_id']: dask_cudf.Series
Contains the edge ids from the sampling result
df['edge_type']: dask_cudf.Series
Contains the edge types from the sampling result
df['batch_id']: dask_cudf.Series
Contains the batch ids from the sampling result
df['hop_id']: dask_cudf.Series
Contains the hop ids from the sampling result
If renumber=True:
(adds the following dataframe)
renumber_df['map']: dask_cudf.Series
Contains the renumber maps for each batch
renumber_df['offsets']: dask_cudf.Series
Contains the batch offsets for the renumber maps
df['weight']: dask_cudf.Series
Contains the edge weights from the sampling result
df['edge_id']: dask_cudf.Series
Contains the edge ids from the sampling result
df['edge_type']: dask_cudf.Series
Contains the edge types from the sampling result
df['batch_id']: dask_cudf.Series
Contains the batch ids from the sampling result
df['hop_id']: dask_cudf.Series
Contains the hop ids from the sampling result
If renumber=True:
(adds the following dataframe)
renumber_df['map']: dask_cudf.Series
Contains the renumber maps for each batch
renumber_df['offsets']: dask_cudf.Series
Contains the batch offsets for the renumber maps

If return_offsets=True:
df['sources']: dask_cudf.Series
df['majors']: dask_cudf.Series
Contains the source vertices from the sampling result
df['destinations']: dask_cudf.Series
df['minors']: dask_cudf.Series
Contains the destination vertices from the sampling result
df['edge_weight']: dask_cudf.Series
df['weight']: dask_cudf.Series
Contains the edge weights from the sampling result
df['edge_id']: dask_cudf.Series
Contains the edge ids from the sampling result
@@ -525,13 +473,6 @@ def uniform_neighbor_sample(
if compression not in ["COO", "CSR", "CSC", "DCSR", "DCSC"]:
raise ValueError("compression must be one of COO, CSR, CSC, DCSR, or DCSC")

if with_edge_properties:
warning_msg = (
"The with_edge_properties flag is deprecated"
" and will be removed in the next release."
)
warnings.warn(warning_msg, FutureWarning)

if (
(compression != "COO")
and (not compress_per_hop)
@@ -550,19 +491,6 @@ def uniform_neighbor_sample(
" of the libcugraph C++ API"
)

if include_hop_column:
warning_msg = (
"The include_hop_column flag is deprecated and will be"
" removed in the next release in favor of always "
"excluding the hop column when return_offsets is True"
)
warnings.warn(warning_msg, FutureWarning)

if compression != "COO":
raise ValueError(
"Including the hop id column is only supported with COO compression."
)

if isinstance(start_list, int):
start_list = [start_list]

@@ -573,7 +501,7 @@ def uniform_neighbor_sample(
input_graph.renumber_map.renumbered_src_col_name
].dtype,
)
elif with_edge_properties and not with_batch_ids:
elif not with_batch_ids:
if isinstance(start_list, (cudf.DataFrame, dask_cudf.DataFrame)):
raise ValueError("expected 1d input for start list without batch ids")

@@ -663,15 +591,12 @@ def uniform_neighbor_sample(
"with_replacement": with_replacement,
"weight_t": weight_t,
"indices_t": indices_t,
"with_edge_properties": with_edge_properties,
"random_state": random_state,
"return_offsets": return_offsets,
"return_hops": return_hops,
"prior_sources_behavior": prior_sources_behavior,
"deduplicate_sources": deduplicate_sources,
"renumber": renumber,
"use_legacy_names": use_legacy_names,
"include_hop_column": include_hop_column,
"compress_per_hop": compress_per_hop,
"compression": compression,
}
@@ -703,12 +628,8 @@ def uniform_neighbor_sample(
ddf, renumber_df = ddf

if input_graph.renumbered and not renumber:
if use_legacy_names:
ddf = input_graph.unrenumber(ddf, "sources", preserve_order=True)
ddf = input_graph.unrenumber(ddf, "destinations", preserve_order=True)
else:
ddf = input_graph.unrenumber(ddf, "majors", preserve_order=True)
ddf = input_graph.unrenumber(ddf, "minors", preserve_order=True)
ddf = input_graph.unrenumber(ddf, "majors", preserve_order=True)
ddf = input_graph.unrenumber(ddf, "minors", preserve_order=True)
if return_offsets:
if renumber:
return ddf, offsets_df, renumber_df
Loading