Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Python Neighborhood Sample #4988

Open
wants to merge 26 commits into
base: branch-25.06
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions python/cugraph/cugraph/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@
rw_path,
node2vec,
uniform_neighbor_sample,
homogeneous_neighbor_sample,
heterogeneous_neighbor_sample,
)


Expand Down
157 changes: 37 additions & 120 deletions python/cugraph/cugraph/dask/sampling/uniform_neighbor_sample.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION.
# Copyright (c) 2022-2025, NVIDIA CORPORATION.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -14,8 +14,6 @@

from __future__ import annotations

import warnings

import numpy
from dask import delayed
from dask.distributed import Lock, get_client, wait
Expand Down Expand Up @@ -76,16 +74,14 @@ def create_empty_df_with_edge_props(
weight_t,
return_offsets=False,
renumber=False,
use_legacy_names=True,
include_hop_column=True,
compression="COO",
):
if compression != "COO":
majors_name = "major_offsets"
else:
majors_name = src_n if use_legacy_names else "majors"
majors_name = "majors"

minors_name = dst_n if use_legacy_names else "minors"
minors_name = "minors"

if renumber:
empty_df_renumber = cudf.DataFrame(
Expand All @@ -106,8 +102,7 @@ def create_empty_df_with_edge_props(
}
)

if include_hop_column:
df[hop_id_n] = numpy.empty(shape=0, dtype="int32")
df[hop_id_n] = numpy.empty(shape=0, dtype="int32")

empty_df_offsets = cudf.DataFrame(
{
Expand Down Expand Up @@ -160,15 +155,12 @@ def _call_plc_uniform_neighbor_sample(
fanout_vals,
with_replacement,
weight_t,
with_edge_properties,
random_state=None,
return_offsets=False,
return_hops=True,
prior_sources_behavior=None,
deduplicate_sources=False,
renumber=False,
use_legacy_names=True,
include_hop_column=True,
compress_per_hop=False,
compression="COO",
):
Expand All @@ -193,7 +185,6 @@ def _call_plc_uniform_neighbor_sample(
h_fan_out=fanout_vals,
with_replacement=with_replacement,
do_expensive_check=False,
with_edge_properties=with_edge_properties,
batch_id_list=batch_id_list_x,
random_state=random_state,
prior_sources_behavior=prior_sources_behavior,
Expand All @@ -202,23 +193,19 @@ def _call_plc_uniform_neighbor_sample(
renumber=renumber,
compression=compression,
compress_per_hop=compress_per_hop,
return_dict=True,
)

# have to import here due to circular import issue
from cugraph.sampling.sampling_utilities import (
sampling_results_from_cupy_array_dict,
legacy_sampling_results_from_cupy_array_dict,
)

return sampling_results_from_cupy_array_dict(
return legacy_sampling_results_from_cupy_array_dict(
cupy_array_dict,
weight_t,
len(fanout_vals),
with_edge_properties=with_edge_properties,
return_offsets=return_offsets,
renumber=renumber,
use_legacy_names=use_legacy_names,
include_hop_column=include_hop_column,
)


Expand All @@ -234,15 +221,12 @@ def _mg_call_plc_uniform_neighbor_sample(
with_replacement,
weight_t,
indices_t,
with_edge_properties,
random_state,
return_offsets=False,
return_hops=True,
prior_sources_behavior=None,
deduplicate_sources=False,
renumber=False,
use_legacy_names=True,
include_hop_column=True,
compress_per_hop=False,
compression="COO",
):
Expand All @@ -268,16 +252,13 @@ def _mg_call_plc_uniform_neighbor_sample(
fanout_vals,
with_replacement,
weight_t=weight_t,
with_edge_properties=with_edge_properties,
# FIXME accept and properly transmute a numpy/cupy random state.
random_state=hash((random_state, w)),
return_offsets=return_offsets,
return_hops=return_hops,
prior_sources_behavior=prior_sources_behavior,
deduplicate_sources=deduplicate_sources,
renumber=renumber,
use_legacy_names=use_legacy_names, # remove in 23.12
include_hop_column=include_hop_column, # remove in 23.12
compress_per_hop=compress_per_hop,
compression=compression,
allow_other_workers=False,
Expand All @@ -287,18 +268,12 @@ def _mg_call_plc_uniform_neighbor_sample(
]
del ddf

empty_df = (
create_empty_df_with_edge_props(
indices_t,
weight_t,
return_offsets=return_offsets,
renumber=renumber,
use_legacy_names=use_legacy_names,
compression=compression,
include_hop_column=include_hop_column,
)
if with_edge_properties
else create_empty_df(indices_t, weight_t)
empty_df = create_empty_df_with_edge_props(
indices_t,
weight_t,
return_offsets=return_offsets,
renumber=renumber,
compression=compression,
)
if not isinstance(empty_df, (list, tuple)):
empty_df = [empty_df]
Expand Down Expand Up @@ -347,19 +322,16 @@ def uniform_neighbor_sample(
fanout_vals: List[int],
*,
with_replacement: bool = True,
with_edge_properties: bool = False, # deprecated
with_batch_ids: bool = False,
keep_batches_together=False,
min_batch_id=None,
max_batch_id=None,
random_state: int = None,
return_offsets: bool = False,
return_hops: bool = True,
include_hop_column: bool = True, # deprecated
prior_sources_behavior: str = None,
deduplicate_sources: bool = False,
renumber: bool = False,
use_legacy_names=True, # deprecated
compress_per_hop=False,
compression="COO",
_multiple_clients: bool = False,
Expand All @@ -384,11 +356,6 @@ def uniform_neighbor_sample(
with_replacement: bool, optional (default=True)
Flag to specify if the random sampling is done with replacement

with_edge_properties: bool, optional (default=False)
Deprecated.
Flag to specify whether to return edge properties (weight, edge id,
edge type, batch id, hop id) with the sampled edges.

with_batch_ids: bool, optional (default=False)
Flag to specify whether batch ids are present in the start_list

Expand Down Expand Up @@ -416,12 +383,6 @@ def uniform_neighbor_sample(
corresponding to the hop where the edge appeared.
Defaults to True.

include_hop_column: bool, optional (default=True)
Deprecated. Defaults to True.
If True, will include the hop column even if
return_offsets is True. This option will
be removed in release 23.12.

prior_sources_behavior: str (Optional)
Options are "carryover", and "exclude".
Default will leave the source list as-is.
Expand All @@ -440,13 +401,6 @@ def uniform_neighbor_sample(
will return the renumber map and renumber map offsets
as an additional dataframe.

use_legacy_names: bool, optional (default=True)
Whether to use the legacy column names (sources, destinations).
If True, will use "sources" and "destinations" as the column names.
If False, will use "majors" and "minors" as the column names.
Deprecated. Will be removed in release 23.12 in favor of always
using the new names "majors" and "minors".

compress_per_hop: bool, optional (default=False)
Whether to compress globally (default), or to produce a separate
compressed edgelist per hop.
Expand All @@ -464,44 +418,34 @@ def uniform_neighbor_sample(
result : dask_cudf.DataFrame or Tuple[dask_cudf.DataFrame, dask_cudf.DataFrame]
GPU distributed data frame containing several dask_cudf.Series

If with_edge_properties=True:
ddf['sources']: dask_cudf.Series
If return_offsets=False:
df['majors']: dask_cudf.Series
Contains the source vertices from the sampling result
ddf['destinations']: dask_cudf.Series
df['minors']: dask_cudf.Series
Contains the destination vertices from the sampling result
ddf['indices']: dask_cudf.Series
Contains the indices from the sampling result for path
reconstruction

If with_edge_properties=False:
If return_offsets=False:
df['sources']: dask_cudf.Series
Contains the source vertices from the sampling result
df['destinations']: dask_cudf.Series
Contains the destination vertices from the sampling result
df['edge_weight']: dask_cudf.Series
Contains the edge weights from the sampling result
df['edge_id']: dask_cudf.Series
Contains the edge ids from the sampling result
df['edge_type']: dask_cudf.Series
Contains the edge types from the sampling result
df['batch_id']: dask_cudf.Series
Contains the batch ids from the sampling result
df['hop_id']: dask_cudf.Series
Contains the hop ids from the sampling result
If renumber=True:
(adds the following dataframe)
renumber_df['map']: dask_cudf.Series
Contains the renumber maps for each batch
renumber_df['offsets']: dask_cudf.Series
Contains the batch offsets for the renumber maps
df['weight']: dask_cudf.Series
Contains the edge weights from the sampling result
df['edge_id']: dask_cudf.Series
Contains the edge ids from the sampling result
df['edge_type']: dask_cudf.Series
Contains the edge types from the sampling result
df['batch_id']: dask_cudf.Series
Contains the batch ids from the sampling result
df['hop_id']: dask_cudf.Series
Contains the hop ids from the sampling result
If renumber=True:
(adds the following dataframe)
renumber_df['map']: dask_cudf.Series
Contains the renumber maps for each batch
renumber_df['offsets']: dask_cudf.Series
Contains the batch offsets for the renumber maps

If return_offsets=True:
df['sources']: dask_cudf.Series
df['majors']: dask_cudf.Series
Contains the source vertices from the sampling result
df['destinations']: dask_cudf.Series
df['minors']: dask_cudf.Series
Contains the destination vertices from the sampling result
df['edge_weight']: dask_cudf.Series
df['weight']: dask_cudf.Series
Contains the edge weights from the sampling result
df['edge_id']: dask_cudf.Series
Contains the edge ids from the sampling result
Expand All @@ -525,13 +469,6 @@ def uniform_neighbor_sample(
if compression not in ["COO", "CSR", "CSC", "DCSR", "DCSC"]:
raise ValueError("compression must be one of COO, CSR, CSC, DCSR, or DCSC")

if with_edge_properties:
warning_msg = (
"The with_edge_properties flag is deprecated"
" and will be removed in the next release."
)
warnings.warn(warning_msg, FutureWarning)

if (
(compression != "COO")
and (not compress_per_hop)
Expand All @@ -550,19 +487,6 @@ def uniform_neighbor_sample(
" of the libcugraph C++ API"
)

if include_hop_column:
warning_msg = (
"The include_hop_column flag is deprecated and will be"
" removed in the next release in favor of always "
"excluding the hop column when return_offsets is True"
)
warnings.warn(warning_msg, FutureWarning)

if compression != "COO":
raise ValueError(
"Including the hop id column is only supported with COO compression."
)

if isinstance(start_list, int):
start_list = [start_list]

Expand All @@ -573,7 +497,7 @@ def uniform_neighbor_sample(
input_graph.renumber_map.renumbered_src_col_name
].dtype,
)
elif with_edge_properties and not with_batch_ids:
elif not with_batch_ids:
if isinstance(start_list, (cudf.DataFrame, dask_cudf.DataFrame)):
raise ValueError("expected 1d input for start list without batch ids")

Expand Down Expand Up @@ -663,15 +587,12 @@ def uniform_neighbor_sample(
"with_replacement": with_replacement,
"weight_t": weight_t,
"indices_t": indices_t,
"with_edge_properties": with_edge_properties,
"random_state": random_state,
"return_offsets": return_offsets,
"return_hops": return_hops,
"prior_sources_behavior": prior_sources_behavior,
"deduplicate_sources": deduplicate_sources,
"renumber": renumber,
"use_legacy_names": use_legacy_names,
"include_hop_column": include_hop_column,
"compress_per_hop": compress_per_hop,
"compression": compression,
}
Expand Down Expand Up @@ -703,12 +624,8 @@ def uniform_neighbor_sample(
ddf, renumber_df = ddf

if input_graph.renumbered and not renumber:
if use_legacy_names:
ddf = input_graph.unrenumber(ddf, "sources", preserve_order=True)
ddf = input_graph.unrenumber(ddf, "destinations", preserve_order=True)
else:
ddf = input_graph.unrenumber(ddf, "majors", preserve_order=True)
ddf = input_graph.unrenumber(ddf, "minors", preserve_order=True)
ddf = input_graph.unrenumber(ddf, "majors", preserve_order=True)
ddf = input_graph.unrenumber(ddf, "minors", preserve_order=True)
if return_offsets:
if renumber:
return ddf, offsets_df, renumber_df
Expand Down
2 changes: 2 additions & 0 deletions python/cugraph/cugraph/sampling/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,5 @@
from cugraph.sampling.node2vec_random_walks import node2vec_random_walks
from cugraph.sampling.node2vec import node2vec
from cugraph.sampling.uniform_neighbor_sample import uniform_neighbor_sample
from cugraph.sampling.homogeneous_neighbor_sample import homogeneous_neighbor_sample
from cugraph.sampling.heterogeneous_neighbor_sample import heterogeneous_neighbor_sample
Loading
Loading