Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions python/src/spark_rapids_ml/clustering.py
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,12 @@ def _chunk_arr(

default_num_partitions = dataset.rdd.getNumPartitions()

# we must hash repartition here to create a shuffle boundary for barrier rdd to work in all cases, even if
# dataset.rdd.getNumPartitions() == self.num_workers as dataset parents might have different numbers of partitions.
# a try around barrier is not possible here since the barrier logic is not executed
# until the caller does something with the lazily returned dataframe.
dataset = dataset.repartition(self.num_workers)

rdd = self._call_cuml_fit_func(
dataset=dataset,
partially_collect=False,
Expand Down
27 changes: 21 additions & 6 deletions python/src/spark_rapids_ml/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1192,16 +1192,31 @@ def _fit_internal(
self, dataset: DataFrame, paramMaps: Optional[Sequence["ParamMap"]]
) -> List["_CumlModel"]:
"""Fit multiple models according to the parameters maps"""
pipelined_rdd = self._call_cuml_fit_func(
dataset=dataset,
partially_collect=True,
paramMaps=paramMaps,
)

self.logger.info(
f"Training spark-rapids-ml with {self.num_workers} worker(s) ..."
)
rows = pipelined_rdd.collect()
try:
pipelined_rdd = self._call_cuml_fit_func(
dataset=dataset,
partially_collect=True,
paramMaps=paramMaps,
)
rows = pipelined_rdd.collect()
except Exception as e:
if "BarrierJobUnsupportedRDDChainException" in str(e):
self.logger.warning(
"Barrier rdd error encountered with input dataset. Retrying with repartitioning."
)
pipelined_rdd = self._call_cuml_fit_func(
dataset=dataset.repartition(self.num_workers),
partially_collect=True,
paramMaps=paramMaps,
)
rows = pipelined_rdd.collect()
else:
raise

self.logger.info("Finished training")

models: List["_CumlModel"] = [None] # type: ignore
Expand Down
9 changes: 8 additions & 1 deletion python/src/spark_rapids_ml/knn.py
Original file line number Diff line number Diff line change
Expand Up @@ -628,9 +628,16 @@ def select_cols_for_cuml_fit(df_origin: DataFrame) -> DataFrame:

df_item_for_nns = select_cols_for_cuml_fit(self._processed_item_df)
df_query_for_nns = select_cols_for_cuml_fit(processed_query_df)
union_df = df_item_for_nns.union(df_query_for_nns)

# we must hash repartition here to create a shuffle boundary for barrier rdd to work in all cases, even if
# union_df.rdd.getNumPartitions() == self.num_workers as union_df parents might have different numbers of partitions.
# a try around barrier is not possible here since the barrier logic is not executed
# until the caller does something with the lazily returned dataframe.
union_df = df_item_for_nns.union(df_query_for_nns).repartition(self.num_workers)

pipelinedrdd = self._call_cuml_fit_func(union_df, partially_collect=False)

# this creates another shuffle boundary for the barrier rdd to work in all cases.
pipelinedrdd = pipelinedrdd.repartition(query_default_num_partitions) # type: ignore

query_id_col_name = f"query_{self._getIdColOrDefault()}"
Expand Down
27 changes: 27 additions & 0 deletions python/tests/test_kmeans.py
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,33 @@ def assert_cuml_spark_model(
).collect()


def test_kmeans_basic_repartition(
gpu_number: int, tmp_path: str, caplog: LogCaptureFixture
) -> None:
# reduce the number of GPUs for toy dataset to avoid empty partition
gpu_number = min(gpu_number, 2)
data = [[1.0, 1.0], [1.0, 2.0], [3.0, 2.0], [4.0, 3.0]]

with CleanSparkSession() as spark:
df = (
spark.sparkContext.parallelize(data, gpu_number + 1)
.map(lambda row: (row,))
.toDF(["features"])
.coalesce(gpu_number)
)
kmeans = (
KMeans(num_workers=gpu_number, n_clusters=2)
.setFeaturesCol("features")
.setSeed(0)
)

kmeans_model = kmeans.fit(df)
assert (
"Barrier rdd error encountered with input dataset. Retrying with repartitioning."
in caplog.text
)


@pytest.mark.parametrize("data_type", ["byte", "short", "int", "long"])
def test_kmeans_numeric_type(gpu_number: int, data_type: str) -> None:
# reduce the number of GPUs for toy dataset to avoid empty partition
Expand Down