@@ -638,6 +638,111 @@ def _cuml_fit(df: CumlInputType, params: Dict[str, Any]) -> Dict[str, Any]:
638638 """
639639 raise NotImplementedError ()
640640
641+ def _skip_stage_level_scheduling (self , spark_version : str , conf : SparkConf ) -> bool :
642+ """Check if stage-level scheduling is not needed,
643+ return true to skip stage-level scheduling"""
644+
645+ if spark_version < "3.4.0" :
646+ self .logger .info (
647+ "Stage-level scheduling in spark-rapids-ml requires spark version 3.4.0+"
648+ )
649+ return True
650+
651+ if "3.4.0" <= spark_version < "3.5.1" and not _is_standalone_or_localcluster (
652+ conf
653+ ):
654+ self .logger .info (
655+ "For Spark %s, Stage-level scheduling in spark-rapids-ml requires spark "
656+ "standalone or local-cluster mode" ,
657+ spark_version ,
658+ )
659+ return True
660+
661+ executor_cores = conf .get ("spark.executor.cores" )
662+ executor_gpus = conf .get ("spark.executor.resource.gpu.amount" )
663+ if executor_cores is None or executor_gpus is None :
664+ self .logger .info (
665+ "Stage-level scheduling in spark-rapids-ml requires spark.executor.cores, "
666+ "spark.executor.resource.gpu.amount to be set."
667+ )
668+ return True
669+
670+ if int (executor_cores ) == 1 :
671+ # there will be only 1 task running at any time.
672+ self .logger .info (
673+ "Stage-level scheduling in spark-rapids-ml requires spark.executor.cores > 1 "
674+ )
675+ return True
676+
677+ if int (executor_gpus ) > 1 :
678+ # For spark.executor.resource.gpu.amount > 1, we suppose user knows how to configure
679+ # to make spark-rapids-ml run successfully.
680+ self .logger .info (
681+ "Stage-level scheduling in spark-rapids-ml will not work "
682+ "when spark.executor.resource.gpu.amount>1"
683+ )
684+ return True
685+
686+ task_gpu_amount = conf .get ("spark.task.resource.gpu.amount" )
687+
688+ if task_gpu_amount is None :
689+ # The ETL tasks will not grab a gpu when spark.task.resource.gpu.amount is not set,
690+ # but with stage-level scheduling, we can make training task grab the gpu.
691+ return False
692+
693+ if float (task_gpu_amount ) == float (executor_gpus ):
694+ # spark.executor.resource.gpu.amount=spark.task.resource.gpu.amount "
695+ # results in only 1 task running at a time, which may cause perf issue.
696+ return True
697+
698+ # We can enable stage-level scheduling
699+ return False
700+
701+ def _try_stage_level_scheduling (self , rdd : RDD ) -> RDD :
702+ ss = _get_spark_session ()
703+ sc = ss .sparkContext
704+
705+ if _is_local (sc ) or self ._skip_stage_level_scheduling (ss .version , sc .getConf ()):
706+ return rdd
707+
708+ # executor_cores will not be None
709+ executor_cores = ss .sparkContext .getConf ().get ("spark.executor.cores" )
710+ assert executor_cores is not None
711+
712+ from pyspark .resource .profile import ResourceProfileBuilder
713+ from pyspark .resource .requests import TaskResourceRequests
714+
715+ # each training task requires cpu cores > total executor cores/2 which can
716+ # ensure each training task be sent to different executor.
717+ #
718+ # Please note that we can't set task_cores to the value which is smaller than total executor cores/2
719+ # because only task_gpus can't ensure the tasks be sent to different executor even task_gpus=1.0
720+ #
721+ # If spark-rapids enabled. we don't allow other ETL task running alongside training task to avoid OOM
722+ spark_plugins = ss .conf .get ("spark.plugins" , " " )
723+ assert spark_plugins is not None
724+ spark_rapids_sql_enabled = ss .conf .get ("spark.rapids.sql.enabled" , "true" )
725+ assert spark_rapids_sql_enabled is not None
726+
727+ task_cores = (
728+ int (executor_cores )
729+ if "com.nvidia.spark.SQLPlugin" in spark_plugins
730+ and "true" == spark_rapids_sql_enabled .lower ()
731+ else (int (executor_cores ) // 2 ) + 1
732+ )
733+ # task_gpus means how many slots per gpu address the task requires,
734+ # it does mean how many gpus it would like to require, so it can be any value of (0, 0.5] or 1.
735+ task_gpus = 1.0
736+
737+ treqs = TaskResourceRequests ().cpus (task_cores ).resource ("gpu" , task_gpus )
738+ rp = ResourceProfileBuilder ().require (treqs ).build
739+
740+ self .logger .info (
741+ f"Training tasks require the resource(cores={ task_cores } , gpu={ task_gpus } )"
742+ )
743+
744+ return rdd .withResources (rp )
745+
641746 def _call_cuml_fit_func (
642747 self ,
643748 dataset : DataFrame ,
@@ -911,6 +1016,8 @@ def _train_udf(pdf_iter: Iterator[pd.DataFrame]) -> pd.DataFrame:
9111016 .mapPartitions (lambda x : x )
9121017 )
9131018
1019+ pipelined_rdd = self ._try_stage_level_scheduling (pipelined_rdd )
1020+
9141021 return pipelined_rdd
9151022
9161023 def _fit_array_order (self ) -> _ArrayOrder :
@@ -1081,111 +1188,6 @@ def fitMultipleModels() -> List["_CumlModel"]:
10811188 else :
10821189 return super ().fitMultiple (dataset , paramMaps )
10831190
1084- def _skip_stage_level_scheduling (self , spark_version : str , conf : SparkConf ) -> bool :
1085- """Check if stage-level scheduling is not needed,
1086- return true to skip stage-level scheduling"""
1087-
1088- if spark_version < "3.4.0" :
1089- self .logger .info (
1090- "Stage-level scheduling in spark-rapids-ml requires spark version 3.4.0+"
1091- )
1092- return True
1093-
1094- if "3.4.0" <= spark_version < "3.5.1" and not _is_standalone_or_localcluster (
1095- conf
1096- ):
1097- self .logger .info (
1098- "For Spark %s, Stage-level scheduling in spark-rapids-ml requires spark "
1099- "standalone or local-cluster mode" ,
1100- spark_version ,
1101- )
1102- return True
1103-
1104- executor_cores = conf .get ("spark.executor.cores" )
1105- executor_gpus = conf .get ("spark.executor.resource.gpu.amount" )
1106- if executor_cores is None or executor_gpus is None :
1107- self .logger .info (
1108- "Stage-level scheduling in spark-rapids-ml requires spark.executor.cores, "
1109- "spark.executor.resource.gpu.amount to be set."
1110- )
1111- return True
1112-
1113- if int (executor_cores ) == 1 :
1114- # there will be only 1 task running at any time.
1115- self .logger .info (
1116- "Stage-level scheduling in spark-rapids-ml requires spark.executor.cores > 1 "
1117- )
1118- return True
1119-
1120- if int (executor_gpus ) > 1 :
1121- # For spark.executor.resource.gpu.amount > 1, we suppose user knows how to configure
1122- # to make spark-rapids-ml run successfully.
1123- self .logger .info (
1124- "Stage-level scheduling in spark-rapids-ml will not work "
1125- "when spark.executor.resource.gpu.amount>1"
1126- )
1127- return True
1128-
1129- task_gpu_amount = conf .get ("spark.task.resource.gpu.amount" )
1130-
1131- if task_gpu_amount is None :
1132- # The ETL tasks will not grab a gpu when spark.task.resource.gpu.amount is not set,
1133- # but with stage-level scheduling, we can make training task grab the gpu.
1134- return False
1135-
1136- if float (task_gpu_amount ) == float (executor_gpus ):
1137- # spark.executor.resource.gpu.amount=spark.task.resource.gpu.amount "
1138- # results in only 1 task running at a time, which may cause perf issue.
1139- return True
1140-
1141- # We can enable stage-level scheduling
1142- return False
1143-
1144- def _try_stage_level_scheduling (self , rdd : RDD ) -> RDD :
1145- ss = _get_spark_session ()
1146- sc = ss .sparkContext
1147-
1148- if _is_local (sc ) or self ._skip_stage_level_scheduling (ss .version , sc .getConf ()):
1149- return rdd
1150-
1151- # executor_cores will not be None
1152- executor_cores = ss .sparkContext .getConf ().get ("spark.executor.cores" )
1153- assert executor_cores is not None
1154-
1155- from pyspark .resource .profile import ResourceProfileBuilder
1156- from pyspark .resource .requests import TaskResourceRequests
1157-
1158- # each training task requires cpu cores > total executor cores/2 which can
1159- # ensure each training task be sent to different executor.
1160- #
1161- # Please note that we can't set task_cores to the value which is smaller than total executor cores/2
1162- # because only task_gpus can't ensure the tasks be sent to different executor even task_gpus=1.0
1163- #
1164- # If spark-rapids enabled. we don't allow other ETL task running alongside training task to avoid OOM
1165- spark_plugins = ss .conf .get ("spark.plugins" , " " )
1166- assert spark_plugins is not None
1167- spark_rapids_sql_enabled = ss .conf .get ("spark.rapids.sql.enabled" , "true" )
1168- assert spark_rapids_sql_enabled is not None
1169-
1170- task_cores = (
1171- int (executor_cores )
1172- if "com.nvidia.spark.SQLPlugin" in spark_plugins
1173- and "true" == spark_rapids_sql_enabled .lower ()
1174- else (int (executor_cores ) // 2 ) + 1
1175- )
1176- # task_gpus means how many slots per gpu address the task requires,
1177- # it does mean how many gpus it would like to require, so it can be any value of (0, 0.5] or 1.
1178- task_gpus = 1.0
1179-
1180- treqs = TaskResourceRequests ().cpus (task_cores ).resource ("gpu" , task_gpus )
1181- rp = ResourceProfileBuilder ().require (treqs ).build
1182-
1183- self .logger .info (
1184- f"Training tasks require the resource(cores={ task_cores } , gpu={ task_gpus } )"
1185- )
1186-
1187- return rdd .withResources (rp )
1188-
11891191 def _fit_internal (
11901192 self , dataset : DataFrame , paramMaps : Optional [Sequence ["ParamMap" ]]
11911193 ) -> List ["_CumlModel" ]:
@@ -1196,8 +1198,6 @@ def _fit_internal(
11961198 paramMaps = paramMaps ,
11971199 )
11981200
1199- pipelined_rdd = self ._try_stage_level_scheduling (pipelined_rdd )
1200-
12011201 self .logger .info (
12021202 f"Training spark-rapids-ml with { self .num_workers } worker(s) ..."
12031203 )
0 commit comments