Skip to content

Commit 7d5c64b

Browse files
correctly handle corner case barrier rdd errors (#993)
We currently have an optimization that if input dfs have the same number of partitions as num_workers, we skip the repartition. This can be problematic in cases when the df has ancestors with a different number of partitions before a shuffle boundary, which is incompatible with barrier rdd logic. There are two fixes: - In the case of the algos that collect the barrier rdd results to get model training results, the default path is taken and is retried with a hash repartitioning (introducing a shuffle boundary) if a barrier error is encountered. - In the case of knn and dbscan which return the barrier rdd after converting to a df, there is no opportunity to retry since the calling user code would encounter this error. In this case, the only option is to repartition the input in all cases. --------- Signed-off-by: Erik Ordentlich <[email protected]> Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
1 parent bf4ad1d commit 7d5c64b

File tree

4 files changed

+62
-7
lines changed

4 files changed

+62
-7
lines changed

python/src/spark_rapids_ml/clustering.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1094,6 +1094,12 @@ def _chunk_arr(
10941094

10951095
default_num_partitions = dataset.rdd.getNumPartitions()
10961096

1097+
# we must hash repartition here to create a shuffle boundary for barrier rdd to work in all cases, even if
1098+
# dataset.rdd.getNumPartitions() == self.num_workers as dataset parents might have different numbers of partitions.
1099+
# a try around barrier is not possible here since the barrier logic is not executed
1100+
# until the caller does something with the lazily returned dataframe.
1101+
dataset = dataset.repartition(self.num_workers)
1102+
10971103
rdd = self._call_cuml_fit_func(
10981104
dataset=dataset,
10991105
partially_collect=False,

python/src/spark_rapids_ml/core.py

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1192,16 +1192,31 @@ def _fit_internal(
11921192
self, dataset: DataFrame, paramMaps: Optional[Sequence["ParamMap"]]
11931193
) -> List["_CumlModel"]:
11941194
"""Fit multiple models according to the parameters maps"""
1195-
pipelined_rdd = self._call_cuml_fit_func(
1196-
dataset=dataset,
1197-
partially_collect=True,
1198-
paramMaps=paramMaps,
1199-
)
12001195

12011196
self.logger.info(
12021197
f"Training spark-rapids-ml with {self.num_workers} worker(s) ..."
12031198
)
1204-
rows = pipelined_rdd.collect()
1199+
try:
1200+
pipelined_rdd = self._call_cuml_fit_func(
1201+
dataset=dataset,
1202+
partially_collect=True,
1203+
paramMaps=paramMaps,
1204+
)
1205+
rows = pipelined_rdd.collect()
1206+
except Exception as e:
1207+
if "BarrierJobUnsupportedRDDChainException" in str(e):
1208+
self.logger.warning(
1209+
"Barrier rdd error encountered with input dataset. Retrying with repartitioning."
1210+
)
1211+
pipelined_rdd = self._call_cuml_fit_func(
1212+
dataset=dataset.repartition(self.num_workers),
1213+
partially_collect=True,
1214+
paramMaps=paramMaps,
1215+
)
1216+
rows = pipelined_rdd.collect()
1217+
else:
1218+
raise
1219+
12051220
self.logger.info("Finished training")
12061221

12071222
models: List["_CumlModel"] = [None] # type: ignore

python/src/spark_rapids_ml/knn.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -628,9 +628,16 @@ def select_cols_for_cuml_fit(df_origin: DataFrame) -> DataFrame:
628628

629629
df_item_for_nns = select_cols_for_cuml_fit(self._processed_item_df)
630630
df_query_for_nns = select_cols_for_cuml_fit(processed_query_df)
631-
union_df = df_item_for_nns.union(df_query_for_nns)
631+
632+
# we must hash repartition here to create a shuffle boundary for barrier rdd to work in all cases, even if
633+
# union_df.rdd.getNumPartitions() == self.num_workers as union_df parents might have different numbers of partitions.
634+
# a try around barrier is not possible here since the barrier logic is not executed
635+
# until the caller does something with the lazily returned dataframe.
636+
union_df = df_item_for_nns.union(df_query_for_nns).repartition(self.num_workers)
632637

633638
pipelinedrdd = self._call_cuml_fit_func(union_df, partially_collect=False)
639+
640+
# this creates another shuffle boundary for the barrier rdd to work in all cases.
634641
pipelinedrdd = pipelinedrdd.repartition(query_default_num_partitions) # type: ignore
635642

636643
query_id_col_name = f"query_{self._getIdColOrDefault()}"

python/tests/test_kmeans.py

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -282,6 +282,33 @@ def assert_cuml_spark_model(
282282
).collect()
283283

284284

285+
def test_kmeans_basic_repartition(
286+
gpu_number: int, tmp_path: str, caplog: LogCaptureFixture
287+
) -> None:
288+
# reduce the number of GPUs for toy dataset to avoid empty partition
289+
gpu_number = min(gpu_number, 2)
290+
data = [[1.0, 1.0], [1.0, 2.0], [3.0, 2.0], [4.0, 3.0]]
291+
292+
with CleanSparkSession() as spark:
293+
df = (
294+
spark.sparkContext.parallelize(data, gpu_number + 1)
295+
.map(lambda row: (row,))
296+
.toDF(["features"])
297+
.coalesce(gpu_number)
298+
)
299+
kmeans = (
300+
KMeans(num_workers=gpu_number, n_clusters=2)
301+
.setFeaturesCol("features")
302+
.setSeed(0)
303+
)
304+
305+
kmeans_model = kmeans.fit(df)
306+
assert (
307+
"Barrier rdd error encountered with input dataset. Retrying with repartitioning."
308+
in caplog.text
309+
)
310+
311+
285312
@pytest.mark.parametrize("data_type", ["byte", "short", "int", "long"])
286313
def test_kmeans_numeric_type(gpu_number: int, data_type: str) -> None:
287314
# reduce the number of GPUs for toy dataset to avoid empty partition

0 commit comments

Comments
 (0)